diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 4b2058bbcf..0826cef837 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -119,6 +119,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.datanode.data.transfer.bandwidthPerSec"; public static final long DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_DEFAULT = 0; // A value of zero indicates no limit + public static final String DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY = + "dfs.datanode.data.write.bandwidthPerSec"; + // A value of zero indicates no limit + public static final long DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_DEFAULT = 0; @Deprecated public static final String DFS_DATANODE_READAHEAD_BYTES_KEY = HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index a298e78d59..f322119533 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -46,6 +46,9 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEF import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY; +import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY; +import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE; +import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY; import static org.apache.hadoop.util.ExitUtil.terminate; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -2500,8 +2503,8 @@ public class DataNode extends ReconfigurableBase final String clientname; final CachingStrategy cachingStrategy; - /** Throttle to block replication when data transfers. */ - private DataTransferThrottler transferThrottler; + /** Throttle to block replication when data transfers or writes. */ + private DataTransferThrottler throttler; /** * Connect to the first item in the target list. Pass along the @@ -2529,14 +2532,10 @@ public class DataNode extends ReconfigurableBase this.clientname = clientname; this.cachingStrategy = new CachingStrategy(true, getDnConf().readaheadLength); - // 1. the stage is PIPELINE_SETUP_CREATE,that is moving blocks, set - // throttler. - // 2. the stage is PIPELINE_SETUP_APPEND_RECOVERY or - // PIPELINE_SETUP_STREAMING_RECOVERY, - // that is writing and recovering pipeline, don't set throttle. - if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE - && clientname.isEmpty()) { - this.transferThrottler = xserver.getTransferThrottler(); + if (isTransfer(stage, clientname)) { + this.throttler = xserver.getTransferThrottler(); + } else if(isWrite(stage)) { + this.throttler = xserver.getWriteThrottler(); } } @@ -2596,7 +2595,7 @@ public class DataNode extends ReconfigurableBase targetStorageIds); // send data & checksum - blockSender.sendBlock(out, unbufOut, transferThrottler); + blockSender.sendBlock(out, unbufOut, throttler); // no response necessary LOG.info("{}, at {}: Transmitted {} (numBytes={}) to {}", @@ -3739,4 +3738,32 @@ public class DataNode extends ReconfigurableBase } return this.diskBalancer; } + + /** + * Construct DataTransfer in {@link DataNode#transferBlock}, the + * BlockConstructionStage is PIPELINE_SETUP_CREATE and clientName is "". + */ + private static boolean isTransfer(BlockConstructionStage stage, + String clientName) { + if (stage == PIPELINE_SETUP_CREATE && clientName.isEmpty()) { + return true; + } + return false; + } + + /** + * Construct DataTransfer in + * {@link DataNode#transferReplicaForPipelineRecovery}. + * + * When recover pipeline, BlockConstructionStage is + * PIPELINE_SETUP_APPEND_RECOVERY, + * PIPELINE_SETUP_STREAMING_RECOVERY,PIPELINE_CLOSE_RECOVERY. If + * BlockConstructionStage is PIPELINE_CLOSE_RECOVERY, don't need transfer + * replica. So BlockConstructionStage is PIPELINE_SETUP_APPEND_RECOVERY, + * PIPELINE_SETUP_STREAMING_RECOVERY. + */ + private static boolean isWrite(BlockConstructionStage stage) { + return (stage == PIPELINE_SETUP_STREAMING_RECOVERY + || stage == PIPELINE_SETUP_APPEND_RECOVERY); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 3b049841fb..f9bc4fdcc0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -905,8 +905,8 @@ class DataXceiver extends Receiver implements Runnable { // receive the block and mirror to the next target if (blockReceiver != null) { String mirrorAddr = (mirrorSock == null) ? null : mirrorNode; - blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, - mirrorAddr, null, targets, false); + blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, mirrorAddr, + dataXceiverServer.getWriteThrottler(), targets, false); // send close-ack for transfer-RBW/Finalized if (isTransfer) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java index 656bb3ea63..ea85a476a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java @@ -171,6 +171,8 @@ class DataXceiverServer implements Runnable { private final DataTransferThrottler transferThrottler; + private final DataTransferThrottler writeThrottler; + /** * Stores an estimate for block size to check if the disk partition has enough * space. Newer clients pass the expected block size to the DataNode. For @@ -205,6 +207,15 @@ class DataXceiverServer implements Runnable { } else { this.transferThrottler = null; } + + bandwidthPerSec = conf.getLongBytes( + DFSConfigKeys.DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY, + DFSConfigKeys.DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_DEFAULT); + if (bandwidthPerSec > 0) { + this.writeThrottler = new DataTransferThrottler(bandwidthPerSec); + } else { + this.writeThrottler = null; + } } @Override @@ -458,6 +469,10 @@ class DataXceiverServer implements Runnable { return transferThrottler; } + public DataTransferThrottler getWriteThrottler() { + return writeThrottler; + } + /** * Release a peer. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index d2ca0af59e..63a99d8e66 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4145,9 +4145,21 @@ dfs.datanode.data.transfer.bandwidthPerSec 0 - Specifies the maximum amount of bandwidth that each datanode can utilize for the data transfering purpose in term - of the number of bytes per second. - when the bandwidth value is zero, there is no limit. + Specifies the maximum amount of bandwidth that the data transfering can utilize for transfering block when + BlockConstructionStage is + PIPELINE_SETUP_CREATE and clientName is empty. + When the bandwidth value is zero, there is no limit. + + + + + dfs.datanode.data.write.bandwidthPerSec + 0 + + Specifies the maximum amount of bandwidth that the data transfering can utilize for writing block or pipeline + recovery when + BlockConstructionStage is PIPELINE_SETUP_APPEND_RECOVERY or PIPELINE_SETUP_STREAMING_RECOVERY. + When the bandwidth value is zero, there is no limit. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java index 357f1ec22b..7859657efb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java @@ -42,6 +42,8 @@ import org.apache.log4j.Level; import org.junit.Assert; import org.junit.Test; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY; + /** Test transferring RBW between datanodes */ public class TestTransferRbw { private static final Logger LOG = @@ -102,13 +104,22 @@ public class TestTransferRbw { final String bpid = cluster.getNamesystem().getBlockPoolId(); { final DataNode oldnode = cluster.getDataNodes().get(0); + // DataXceiverServer#writeThrottler is null if + // dfs.datanode.data.write.bandwidthPerSec default value is 0. + Assert.assertNull(oldnode.xserver.getWriteThrottler()); oldrbw = getRbw(oldnode, bpid); LOG.info("oldrbw = " + oldrbw); //add a datanode + conf.setLong(DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY, + 1024 * 1024 * 8); cluster.startDataNodes(conf, 1, true, null, null); newnode = cluster.getDataNodes().get(REPLICATION); - + // DataXceiverServer#writeThrottler#balancer is equal to + // dfs.datanode.data.write.bandwidthPerSec value if + // dfs.datanode.data.write.bandwidthPerSec value is not zero. + Assert.assertEquals(1024 * 1024 * 8, + newnode.xserver.getWriteThrottler().getBandwidth()); final DatanodeInfo oldnodeinfo; { final DatanodeInfo[] datatnodeinfos = cluster.getNameNodeRpc(