From b62a460fd956249cff0423d612d5df54f53cbb2c Mon Sep 17 00:00:00 2001 From: daimin Date: Sat, 23 Apr 2022 11:45:22 +0800 Subject: [PATCH] HDFS-16519. Add throttler to EC reconstruction (#4101) Reviewed-by: litao Signed-off-by: Takanobu Asanuma (cherry picked from commit aebd55f7883c6b12afe5faeb776ab0e0b83420da) --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 8 +++++++ .../hadoop/hdfs/server/datanode/DataNode.java | 21 +++++++++++++++++++ .../StripedBlockReconstructor.java | 8 +++++++ .../datanode/erasurecode/StripedReader.java | 5 +++++ .../src/main/resources/hdfs-default.xml | 18 ++++++++++++++++ 5 files changed, 60 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 7df1c5cfb7..e3f4bfcde8 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -123,6 +123,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.datanode.data.write.bandwidthPerSec"; // A value of zero indicates no limit public static final long DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_DEFAULT = 0; + public static final String DFS_DATANODE_EC_RECONSTRUCT_READ_BANDWIDTHPERSEC_KEY = + "dfs.datanode.ec.reconstruct.read.bandwidthPerSec"; + public static final long DFS_DATANODE_EC_RECONSTRUCT_READ_BANDWIDTHPERSEC_DEFAULT = + 0; // A value of zero indicates no limit + public static final String DFS_DATANODE_EC_RECONSTRUCT_WRITE_BANDWIDTHPERSEC_KEY = + "dfs.datanode.ec.reconstruct.write.bandwidthPerSec"; + public static final long DFS_DATANODE_EC_RECONSTRUCT_WRITE_BANDWIDTHPERSEC_DEFAULT = + 0; // A value of zero indicates no limit @Deprecated public static final String DFS_DATANODE_READAHEAD_BYTES_KEY = HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index ccdef5302a..ee7f6e3a4c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -472,6 +472,9 @@ private static Tracer createTracer(Configuration conf) { private long startTime = 0; + private DataTransferThrottler ecReconstuctReadThrottler; + private DataTransferThrottler ecReconstuctWriteThrottler; + /** * Creates a dummy DataNode for testing purpose. */ @@ -580,6 +583,16 @@ public Map load(String key) { initOOBTimeout(); this.storageLocationChecker = storageLocationChecker; + long ecReconstuctReadBandwidth = conf.getLongBytes( + DFSConfigKeys.DFS_DATANODE_EC_RECONSTRUCT_READ_BANDWIDTHPERSEC_KEY, + DFSConfigKeys.DFS_DATANODE_EC_RECONSTRUCT_READ_BANDWIDTHPERSEC_DEFAULT); + long ecReconstuctWriteBandwidth = conf.getLongBytes( + DFSConfigKeys.DFS_DATANODE_EC_RECONSTRUCT_WRITE_BANDWIDTHPERSEC_KEY, + DFSConfigKeys.DFS_DATANODE_EC_RECONSTRUCT_WRITE_BANDWIDTHPERSEC_DEFAULT); + this.ecReconstuctReadThrottler = ecReconstuctReadBandwidth > 0 ? + new DataTransferThrottler(100, ecReconstuctReadBandwidth) : null; + this.ecReconstuctWriteThrottler = ecReconstuctWriteBandwidth > 0 ? + new DataTransferThrottler(100, ecReconstuctWriteBandwidth) : null; } @Override // ReconfigurableBase @@ -3717,6 +3730,14 @@ public ShortCircuitRegistry getShortCircuitRegistry() { return shortCircuitRegistry; } + public DataTransferThrottler getEcReconstuctReadThrottler() { + return ecReconstuctReadThrottler; + } + + public DataTransferThrottler getEcReconstuctWriteThrottler() { + return ecReconstuctWriteThrottler; + } + /** * Check the disk error synchronously. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java index 3ead793542..ecd6351b46 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java @@ -95,6 +95,10 @@ void reconstruct() throws IOException { (int) Math.min(getStripedReader().getBufferSize(), remaining); long start = Time.monotonicNow(); + long bytesToRead = (long) toReconstructLen * getStripedReader().getMinRequiredSources(); + if (getDatanode().getEcReconstuctReadThrottler() != null) { + getDatanode().getEcReconstuctReadThrottler().throttle(bytesToRead); + } // step1: read from minimum source DNs required for reconstruction. // The returned success list is the source DNs we do real read from getStripedReader().readMinimumSources(toReconstructLen); @@ -105,6 +109,10 @@ void reconstruct() throws IOException { long decodeEnd = Time.monotonicNow(); // step3: transfer data + long bytesToWrite = (long) toReconstructLen * stripedWriter.getTargets(); + if (getDatanode().getEcReconstuctWriteThrottler() != null) { + getDatanode().getEcReconstuctWriteThrottler().throttle(bytesToWrite); + } if (stripedWriter.transferData2Targets() == 0) { String error = "Transfer failed for all targets."; throw new IOException(error); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java index b1992ea1ac..a302f5e868 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java @@ -508,4 +508,9 @@ CachingStrategy getCachingStrategy() { int getXmits() { return xmits; } + + public int getMinRequiredSources() { + return minRequiredSources; + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 44b4678a9f..0b573ddaf6 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4635,6 +4635,24 @@ + + dfs.datanode.ec.reconstruct.read.bandwidthPerSec + 0 + + Specifies the maximum amount of bandwidth that the EC reconstruction can utilize for reading. + When the bandwidth value is zero, there is no limit. + + + + + dfs.datanode.ec.reconstruct.write.bandwidthPerSec + 0 + + Specifies the maximum amount of bandwidth that the EC reconstruction can utilize for writing. + When the bandwidth value is zero, there is no limit. + + + dfs.datanode.fsdataset.factory