HDFS-12904. Add DataTransferThrottler to the Datanode transfers. Contributed by Lisheng Sun.

This commit is contained in:
Inigo Goiri 2019-09-05 11:44:02 -07:00
parent 511df1e837
commit 72d8b92ba5
4 changed files with 43 additions and 1 deletions

View File

@ -115,6 +115,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
= "dfs.datanode.balance.max.concurrent.moves"; = "dfs.datanode.balance.max.concurrent.moves";
public static final int public static final int
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT = 100; DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT = 100;
public static final String DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY =
"dfs.datanode.data.transfer.bandwidthPerSec";
public static final long DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_DEFAULT =
0; // A value of zero indicates no limit
@Deprecated @Deprecated
public static final String DFS_DATANODE_READAHEAD_BYTES_KEY = public static final String DFS_DATANODE_READAHEAD_BYTES_KEY =
HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY; HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY;

View File

@ -112,6 +112,7 @@
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker; import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker;
import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker; import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@ -2499,6 +2500,9 @@ private class DataTransfer implements Runnable {
final String clientname; final String clientname;
final CachingStrategy cachingStrategy; final CachingStrategy cachingStrategy;
/** Throttle to block replication when data transfers. */
private DataTransferThrottler transferThrottler;
/** /**
* Connect to the first item in the target list. Pass along the * Connect to the first item in the target list. Pass along the
* entire target list, the block, and the data. * entire target list, the block, and the data.
@ -2525,6 +2529,15 @@ private class DataTransfer implements Runnable {
this.clientname = clientname; this.clientname = clientname;
this.cachingStrategy = this.cachingStrategy =
new CachingStrategy(true, getDnConf().readaheadLength); new CachingStrategy(true, getDnConf().readaheadLength);
// 1. the stage is PIPELINE_SETUP_CREATEthat is moving blocks, set
// throttler.
// 2. the stage is PIPELINE_SETUP_APPEND_RECOVERY or
// PIPELINE_SETUP_STREAMING_RECOVERY,
// that is writing and recovering pipeline, don't set throttle.
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE
&& clientname.isEmpty()) {
this.transferThrottler = xserver.getTransferThrottler();
}
} }
/** /**
@ -2583,7 +2596,7 @@ public void run() {
targetStorageIds); targetStorageIds);
// send data & checksum // send data & checksum
blockSender.sendBlock(out, unbufOut, null); blockSender.sendBlock(out, unbufOut, transferThrottler);
// no response necessary // no response necessary
LOG.info("{}, at {}: Transmitted {} (numBytes={}) to {}", LOG.info("{}, at {}: Transmitted {} (numBytes={}) to {}",

View File

@ -169,6 +169,8 @@ void release() {
final BlockBalanceThrottler balanceThrottler; final BlockBalanceThrottler balanceThrottler;
private final DataTransferThrottler transferThrottler;
/** /**
* Stores an estimate for block size to check if the disk partition has enough * Stores an estimate for block size to check if the disk partition has enough
* space. Newer clients pass the expected block size to the DataNode. For * space. Newer clients pass the expected block size to the DataNode. For
@ -194,6 +196,15 @@ void release() {
DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT), DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT),
conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT)); DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT));
long bandwidthPerSec = conf.getLongBytes(
DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY,
DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_DEFAULT);
if (bandwidthPerSec > 0) {
this.transferThrottler = new DataTransferThrottler(bandwidthPerSec);
} else {
this.transferThrottler = null;
}
} }
@Override @Override
@ -443,6 +454,10 @@ PeerServer getPeerServer() {
return peerServer; return peerServer;
} }
public DataTransferThrottler getTransferThrottler() {
return transferThrottler;
}
/** /**
* Release a peer. * Release a peer.
* *

View File

@ -4144,6 +4144,16 @@
</description> </description>
</property> </property>
<property>
<name>dfs.datanode.data.transfer.bandwidthPerSec</name>
<value>0</value>
<description>
Specifies the maximum amount of bandwidth that each datanode can utilize for the data transfering purpose in term
of the number of bytes per second.
when the bandwidth value is zero, there is no limit.
</description>
</property>
<property> <property>
<name>dfs.datanode.fsdataset.factory</name> <name>dfs.datanode.fsdataset.factory</name>
<value></value> <value></value>