HDFS-16519. Add throttler to EC reconstruction (#4101)
Reviewed-by: litao <tomleescut@gmail.com> Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
This commit is contained in:
parent
5ebbacc480
commit
aebd55f788
@ -123,6 +123,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
"dfs.datanode.data.write.bandwidthPerSec";
|
"dfs.datanode.data.write.bandwidthPerSec";
|
||||||
// A value of zero indicates no limit
|
// A value of zero indicates no limit
|
||||||
public static final long DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_DEFAULT = 0;
|
public static final long DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_DEFAULT = 0;
|
||||||
|
public static final String DFS_DATANODE_EC_RECONSTRUCT_READ_BANDWIDTHPERSEC_KEY =
|
||||||
|
"dfs.datanode.ec.reconstruct.read.bandwidthPerSec";
|
||||||
|
public static final long DFS_DATANODE_EC_RECONSTRUCT_READ_BANDWIDTHPERSEC_DEFAULT =
|
||||||
|
0; // A value of zero indicates no limit
|
||||||
|
public static final String DFS_DATANODE_EC_RECONSTRUCT_WRITE_BANDWIDTHPERSEC_KEY =
|
||||||
|
"dfs.datanode.ec.reconstruct.write.bandwidthPerSec";
|
||||||
|
public static final long DFS_DATANODE_EC_RECONSTRUCT_WRITE_BANDWIDTHPERSEC_DEFAULT =
|
||||||
|
0; // A value of zero indicates no limit
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static final String DFS_DATANODE_READAHEAD_BYTES_KEY =
|
public static final String DFS_DATANODE_READAHEAD_BYTES_KEY =
|
||||||
HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY;
|
HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY;
|
||||||
|
@ -470,6 +470,9 @@ private static Tracer createTracer(Configuration conf) {
|
|||||||
|
|
||||||
private long startTime = 0;
|
private long startTime = 0;
|
||||||
|
|
||||||
|
private DataTransferThrottler ecReconstuctReadThrottler;
|
||||||
|
private DataTransferThrottler ecReconstuctWriteThrottler;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a dummy DataNode for testing purpose.
|
* Creates a dummy DataNode for testing purpose.
|
||||||
*/
|
*/
|
||||||
@ -584,6 +587,16 @@ public Map<String, Long> load(String key) {
|
|||||||
|
|
||||||
initOOBTimeout();
|
initOOBTimeout();
|
||||||
this.storageLocationChecker = storageLocationChecker;
|
this.storageLocationChecker = storageLocationChecker;
|
||||||
|
long ecReconstuctReadBandwidth = conf.getLongBytes(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_EC_RECONSTRUCT_READ_BANDWIDTHPERSEC_KEY,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_EC_RECONSTRUCT_READ_BANDWIDTHPERSEC_DEFAULT);
|
||||||
|
long ecReconstuctWriteBandwidth = conf.getLongBytes(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_EC_RECONSTRUCT_WRITE_BANDWIDTHPERSEC_KEY,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_EC_RECONSTRUCT_WRITE_BANDWIDTHPERSEC_DEFAULT);
|
||||||
|
this.ecReconstuctReadThrottler = ecReconstuctReadBandwidth > 0 ?
|
||||||
|
new DataTransferThrottler(100, ecReconstuctReadBandwidth) : null;
|
||||||
|
this.ecReconstuctWriteThrottler = ecReconstuctWriteBandwidth > 0 ?
|
||||||
|
new DataTransferThrottler(100, ecReconstuctWriteBandwidth) : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // ReconfigurableBase
|
@Override // ReconfigurableBase
|
||||||
@ -3830,6 +3843,14 @@ public ShortCircuitRegistry getShortCircuitRegistry() {
|
|||||||
return shortCircuitRegistry;
|
return shortCircuitRegistry;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public DataTransferThrottler getEcReconstuctReadThrottler() {
|
||||||
|
return ecReconstuctReadThrottler;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DataTransferThrottler getEcReconstuctWriteThrottler() {
|
||||||
|
return ecReconstuctWriteThrottler;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check the disk error synchronously.
|
* Check the disk error synchronously.
|
||||||
*/
|
*/
|
||||||
|
@ -95,6 +95,10 @@ void reconstruct() throws IOException {
|
|||||||
(int) Math.min(getStripedReader().getBufferSize(), remaining);
|
(int) Math.min(getStripedReader().getBufferSize(), remaining);
|
||||||
|
|
||||||
long start = Time.monotonicNow();
|
long start = Time.monotonicNow();
|
||||||
|
long bytesToRead = (long) toReconstructLen * getStripedReader().getMinRequiredSources();
|
||||||
|
if (getDatanode().getEcReconstuctReadThrottler() != null) {
|
||||||
|
getDatanode().getEcReconstuctReadThrottler().throttle(bytesToRead);
|
||||||
|
}
|
||||||
// step1: read from minimum source DNs required for reconstruction.
|
// step1: read from minimum source DNs required for reconstruction.
|
||||||
// The returned success list is the source DNs we do real read from
|
// The returned success list is the source DNs we do real read from
|
||||||
getStripedReader().readMinimumSources(toReconstructLen);
|
getStripedReader().readMinimumSources(toReconstructLen);
|
||||||
@ -105,6 +109,10 @@ void reconstruct() throws IOException {
|
|||||||
long decodeEnd = Time.monotonicNow();
|
long decodeEnd = Time.monotonicNow();
|
||||||
|
|
||||||
// step3: transfer data
|
// step3: transfer data
|
||||||
|
long bytesToWrite = (long) toReconstructLen * stripedWriter.getTargets();
|
||||||
|
if (getDatanode().getEcReconstuctWriteThrottler() != null) {
|
||||||
|
getDatanode().getEcReconstuctWriteThrottler().throttle(bytesToWrite);
|
||||||
|
}
|
||||||
if (stripedWriter.transferData2Targets() == 0) {
|
if (stripedWriter.transferData2Targets() == 0) {
|
||||||
String error = "Transfer failed for all targets.";
|
String error = "Transfer failed for all targets.";
|
||||||
throw new IOException(error);
|
throw new IOException(error);
|
||||||
|
@ -508,4 +508,9 @@ CachingStrategy getCachingStrategy() {
|
|||||||
int getXmits() {
|
int getXmits() {
|
||||||
return xmits;
|
return xmits;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getMinRequiredSources() {
|
||||||
|
return minRequiredSources;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -4724,6 +4724,24 @@
|
|||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.datanode.ec.reconstruct.read.bandwidthPerSec</name>
|
||||||
|
<value>0</value>
|
||||||
|
<description>
|
||||||
|
Specifies the maximum amount of bandwidth that the EC reconstruction can utilize for reading.
|
||||||
|
When the bandwidth value is zero, there is no limit.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.datanode.ec.reconstruct.write.bandwidthPerSec</name>
|
||||||
|
<value>0</value>
|
||||||
|
<description>
|
||||||
|
Specifies the maximum amount of bandwidth that the EC reconstruction can utilize for writing.
|
||||||
|
When the bandwidth value is zero, there is no limit.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.datanode.fsdataset.factory</name>
|
<name>dfs.datanode.fsdataset.factory</name>
|
||||||
<value></value>
|
<value></value>
|
||||||
|
Loading…
Reference in New Issue
Block a user