diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 0526f1e441..2e68cb6a1b 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -143,6 +143,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT = 0; public static final String DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY = "dfs.datanode.fsdatasetcache.max.threads.per.volume"; public static final int DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4; + public static final String DFS_DATANODE_FSDATASETASYNCDISK_MAX_THREADS_PER_VOLUME_KEY = + "dfs.datanode.fsdatasetasyncdisk.max.threads.per.volume"; + public static final int DFS_DATANODE_FSDATASETASYNCDISK_MAX_THREADS_PER_VOLUME_DEFAULT = 4; public static final String DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC = "dfs.datanode.lazywriter.interval.sec"; public static final int DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60; public static final String DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY = "dfs.datanode.ram.disk.replica.tracker"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java index 7d5f33b8b8..db4987e25a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java @@ -30,6 +30,8 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -65,7 +67,7 @@ class FsDatasetAsyncDiskService { // ThreadPool core pool size private static final int CORE_THREADS_PER_VOLUME = 1; // ThreadPool maximum pool size - private static final int MAXIMUM_THREADS_PER_VOLUME = 4; + private final int maxNumThreadsPerVolume; // ThreadPool keep-alive time for threads over core pool size private static final long THREADS_KEEP_ALIVE_SECONDS = 60; @@ -90,6 +92,12 @@ class FsDatasetAsyncDiskService { this.datanode = datanode; this.fsdatasetImpl = fsdatasetImpl; this.threadGroup = new ThreadGroup(getClass().getSimpleName()); + maxNumThreadsPerVolume = datanode.getConf().getInt( + DFSConfigKeys.DFS_DATANODE_FSDATASETASYNCDISK_MAX_THREADS_PER_VOLUME_KEY, + DFSConfigKeys.DFS_DATANODE_FSDATASETASYNCDISK_MAX_THREADS_PER_VOLUME_DEFAULT); + Preconditions.checkArgument(maxNumThreadsPerVolume > 0, + DFSConfigKeys.DFS_DATANODE_FSDATASETASYNCDISK_MAX_THREADS_PER_VOLUME_KEY + + " must be a positive integer."); } private void addExecutorForVolume(final FsVolumeImpl volume) { @@ -110,7 +118,7 @@ public Thread newThread(Runnable r) { }; ThreadPoolExecutor executor = new ThreadPoolExecutor( - CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME, + CORE_THREADS_PER_VOLUME, maxNumThreadsPerVolume, THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, new LinkedBlockingQueue(), threadFactory); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 7bcbccd817..9422c1d6c3 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2982,6 +2982,16 @@ + + dfs.datanode.fsdatasetasyncdisk.max.threads.per.volume + 4 + + The maximum number of threads per volume used to process async disk + operations on the datanode. These threads consume I/O and CPU at the + same time. This will affect normal data node operations. + + + dfs.cachereport.intervalMsec 10000