HDFS-16519. Add throttler to EC reconstruction (#4101)

Reviewed-by: litao <tomleescut@gmail.com>
Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
(cherry picked from commit aebd55f788)
This commit is contained in:
daimin 2022-04-23 11:45:22 +08:00 committed by Takanobu Asanuma
parent 9132eeb4dd
commit b62a460fd9
5 changed files with 60 additions and 0 deletions

View File

@ -123,6 +123,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.datanode.data.write.bandwidthPerSec"; "dfs.datanode.data.write.bandwidthPerSec";
// A value of zero indicates no limit // A value of zero indicates no limit
public static final long DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_DEFAULT = 0; public static final long DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_DEFAULT = 0;
public static final String DFS_DATANODE_EC_RECONSTRUCT_READ_BANDWIDTHPERSEC_KEY =
"dfs.datanode.ec.reconstruct.read.bandwidthPerSec";
public static final long DFS_DATANODE_EC_RECONSTRUCT_READ_BANDWIDTHPERSEC_DEFAULT =
0; // A value of zero indicates no limit
public static final String DFS_DATANODE_EC_RECONSTRUCT_WRITE_BANDWIDTHPERSEC_KEY =
"dfs.datanode.ec.reconstruct.write.bandwidthPerSec";
public static final long DFS_DATANODE_EC_RECONSTRUCT_WRITE_BANDWIDTHPERSEC_DEFAULT =
0; // A value of zero indicates no limit
@Deprecated @Deprecated
public static final String DFS_DATANODE_READAHEAD_BYTES_KEY = public static final String DFS_DATANODE_READAHEAD_BYTES_KEY =
HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY; HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY;

View File

@ -472,6 +472,9 @@ private static Tracer createTracer(Configuration conf) {
private long startTime = 0; private long startTime = 0;
private DataTransferThrottler ecReconstuctReadThrottler;
private DataTransferThrottler ecReconstuctWriteThrottler;
/** /**
* Creates a dummy DataNode for testing purpose. * Creates a dummy DataNode for testing purpose.
*/ */
@ -580,6 +583,16 @@ public Map<String, Long> load(String key) {
initOOBTimeout(); initOOBTimeout();
this.storageLocationChecker = storageLocationChecker; this.storageLocationChecker = storageLocationChecker;
long ecReconstuctReadBandwidth = conf.getLongBytes(
DFSConfigKeys.DFS_DATANODE_EC_RECONSTRUCT_READ_BANDWIDTHPERSEC_KEY,
DFSConfigKeys.DFS_DATANODE_EC_RECONSTRUCT_READ_BANDWIDTHPERSEC_DEFAULT);
long ecReconstuctWriteBandwidth = conf.getLongBytes(
DFSConfigKeys.DFS_DATANODE_EC_RECONSTRUCT_WRITE_BANDWIDTHPERSEC_KEY,
DFSConfigKeys.DFS_DATANODE_EC_RECONSTRUCT_WRITE_BANDWIDTHPERSEC_DEFAULT);
this.ecReconstuctReadThrottler = ecReconstuctReadBandwidth > 0 ?
new DataTransferThrottler(100, ecReconstuctReadBandwidth) : null;
this.ecReconstuctWriteThrottler = ecReconstuctWriteBandwidth > 0 ?
new DataTransferThrottler(100, ecReconstuctWriteBandwidth) : null;
} }
@Override // ReconfigurableBase @Override // ReconfigurableBase
@ -3717,6 +3730,14 @@ public ShortCircuitRegistry getShortCircuitRegistry() {
return shortCircuitRegistry; return shortCircuitRegistry;
} }
public DataTransferThrottler getEcReconstuctReadThrottler() {
return ecReconstuctReadThrottler;
}
public DataTransferThrottler getEcReconstuctWriteThrottler() {
return ecReconstuctWriteThrottler;
}
/** /**
* Check the disk error synchronously. * Check the disk error synchronously.
*/ */

View File

@ -95,6 +95,10 @@ void reconstruct() throws IOException {
(int) Math.min(getStripedReader().getBufferSize(), remaining); (int) Math.min(getStripedReader().getBufferSize(), remaining);
long start = Time.monotonicNow(); long start = Time.monotonicNow();
long bytesToRead = (long) toReconstructLen * getStripedReader().getMinRequiredSources();
if (getDatanode().getEcReconstuctReadThrottler() != null) {
getDatanode().getEcReconstuctReadThrottler().throttle(bytesToRead);
}
// step1: read from minimum source DNs required for reconstruction. // step1: read from minimum source DNs required for reconstruction.
// The returned success list is the source DNs we do real read from // The returned success list is the source DNs we do real read from
getStripedReader().readMinimumSources(toReconstructLen); getStripedReader().readMinimumSources(toReconstructLen);
@ -105,6 +109,10 @@ void reconstruct() throws IOException {
long decodeEnd = Time.monotonicNow(); long decodeEnd = Time.monotonicNow();
// step3: transfer data // step3: transfer data
long bytesToWrite = (long) toReconstructLen * stripedWriter.getTargets();
if (getDatanode().getEcReconstuctWriteThrottler() != null) {
getDatanode().getEcReconstuctWriteThrottler().throttle(bytesToWrite);
}
if (stripedWriter.transferData2Targets() == 0) { if (stripedWriter.transferData2Targets() == 0) {
String error = "Transfer failed for all targets."; String error = "Transfer failed for all targets.";
throw new IOException(error); throw new IOException(error);

View File

@ -508,4 +508,9 @@ CachingStrategy getCachingStrategy() {
int getXmits() { int getXmits() {
return xmits; return xmits;
} }
public int getMinRequiredSources() {
return minRequiredSources;
}
} }

View File

@ -4635,6 +4635,24 @@
</description> </description>
</property> </property>
<property>
<name>dfs.datanode.ec.reconstruct.read.bandwidthPerSec</name>
<value>0</value>
<description>
Specifies the maximum amount of bandwidth that the EC reconstruction can utilize for reading.
When the bandwidth value is zero, there is no limit.
</description>
</property>
<property>
<name>dfs.datanode.ec.reconstruct.write.bandwidthPerSec</name>
<value>0</value>
<description>
Specifies the maximum amount of bandwidth that the EC reconstruction can utilize for writing.
When the bandwidth value is zero, there is no limit.
</description>
</property>
<property> <property>
<name>dfs.datanode.fsdataset.factory</name> <name>dfs.datanode.fsdataset.factory</name>
<value></value> <value></value>