diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 262729c3f8..7c6683dfc1 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -123,6 +123,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.datanode.data.write.bandwidthPerSec";
// A value of zero indicates no limit
public static final long DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_DEFAULT = 0;
+ public static final String DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY =
+ "dfs.datanode.data.read.bandwidthPerSec";
+ // A value of zero indicates no limit
+ public static final long DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_DEFAULT = 0;
public static final String DFS_DATANODE_EC_RECONSTRUCT_READ_BANDWIDTHPERSEC_KEY =
"dfs.datanode.ec.reconstruct.read.bandwidthPerSec";
public static final long DFS_DATANODE_EC_RECONSTRUCT_READ_BANDWIDTHPERSEC_DEFAULT =
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index bb61e8037e..42082019ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -57,6 +57,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_DEFAULT;
@@ -368,7 +370,8 @@ public class DataNode extends ReconfigurableBase
DFS_DISK_BALANCER_ENABLED,
DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY,
- DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY));
+ DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY,
+ DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY));
public static final String METRICS_LOG_NAME = "DataNodeMetricsLog";
@@ -702,6 +705,7 @@ public String reconfigurePropertyImpl(String property, String newVal)
case DFS_DATANODE_MAX_RECEIVER_THREADS_KEY:
case DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY:
case DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY:
+ case DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY:
return reconfDataXceiverParameters(property, newVal);
case DFS_CACHEREPORT_INTERVAL_MSEC_KEY:
return reconfCacheReportParameters(property, newVal);
@@ -765,6 +769,18 @@ private String reconfDataXceiverParameters(String property, String newVal)
}
result = Long.toString(bandwidthPerSec);
getXferServer().setWriteThrottler(writeThrottler);
+ } else if (property.equals(DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY)) {
+ Preconditions.checkNotNull(getXferServer(), "DataXceiverServer has not been initialized.");
+ long bandwidthPerSec = (newVal == null ? DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_DEFAULT :
+ Long.parseLong(newVal));
+ DataTransferThrottler readThrottler = null;
+ if (bandwidthPerSec > 0) {
+ readThrottler = new DataTransferThrottler(bandwidthPerSec);
+ } else {
+ bandwidthPerSec = 0;
+ }
+ result = Long.toString(bandwidthPerSec);
+ getXferServer().setReadThrottler(readThrottler);
}
LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
return result;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index d8c55a54d4..e97e179702 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -608,7 +608,8 @@ public void readBlock(final ExtendedBlock block,
writeSuccessWithChecksumInfo(blockSender, new DataOutputStream(getOutputStream()));
long beginRead = Time.monotonicNow();
- read = blockSender.sendBlock(out, baseStream, null); // send data
+ // send data
+ read = blockSender.sendBlock(out, baseStream, dataXceiverServer.getReadThrottler());
long duration = Time.monotonicNow() - beginRead;
if (blockSender.didSendEntireByteRange()) {
// If we sent the entire range, then we should expect the client
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
index 9b31dd3b21..ad61f94b92 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
@@ -172,6 +172,8 @@ void release() {
private volatile DataTransferThrottler writeThrottler;
+ private volatile DataTransferThrottler readThrottler;
+
/**
* Stores an estimate for block size to check if the disk partition has enough
* space. Newer clients pass the expected block size to the DataNode. For
@@ -221,6 +223,15 @@ private void initBandwidthPerSec(Configuration conf) {
} else {
this.writeThrottler = null;
}
+
+ bandwidthPerSec = conf.getLongBytes(
+ DFSConfigKeys.DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY,
+ DFSConfigKeys.DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_DEFAULT);
+ if (bandwidthPerSec > 0) {
+ this.readThrottler = new DataTransferThrottler(bandwidthPerSec);
+ } else {
+ this.readThrottler = null;
+ }
}
@Override
@@ -478,6 +489,10 @@ public DataTransferThrottler getWriteThrottler() {
return writeThrottler;
}
+ public DataTransferThrottler getReadThrottler() {
+ return readThrottler;
+ }
+
/**
* Release a peer.
*
@@ -535,4 +550,8 @@ public void setTransferThrottler(DataTransferThrottler transferThrottler) {
public void setWriteThrottler(DataTransferThrottler writeThrottler) {
this.writeThrottler = writeThrottler;
}
+
+ public void setReadThrottler(DataTransferThrottler readThrottler) {
+ this.readThrottler = readThrottler;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 9a75ec24d2..8d85d7cc3c 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4721,6 +4721,15 @@
+
+ dfs.datanode.data.read.bandwidthPerSec
+ 0
+
+ Specifies the maximum amount of bandwidth that can utilize for reading block.
+ When the bandwidth value is zero, there is no limit.
+
+
+
dfs.datanode.ec.reconstruct.read.bandwidthPerSec
0
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java
index 0ca5bedff8..14c1c301b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java
@@ -31,6 +31,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
@@ -459,6 +460,13 @@ public void testDataXceiverReconfiguration()
assertTrue("expecting NumberFormatException",
expected.getCause() instanceof NumberFormatException);
}
+ try {
+ dn.reconfigureProperty(DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY, "text");
+ fail("ReconfigurationException expected");
+ } catch (ReconfigurationException expected) {
+ assertTrue("expecting NumberFormatException",
+ expected.getCause() instanceof NumberFormatException);
+ }
// Change properties and verify change.
dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, String.valueOf(123));
@@ -477,6 +485,12 @@ public void testDataXceiverReconfiguration()
DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY),
1000, dn.getXferServer().getWriteThrottler().getBandwidth());
+ dn.reconfigureProperty(DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY,
+ String.valueOf(1000));
+ assertEquals(String.format("%s has wrong value",
+ DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY),
+ 1000, dn.getXferServer().getReadThrottler().getBandwidth());
+
// Revert to default.
dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, null);
assertEquals(String.format("%s has wrong value", DFS_DATANODE_MAX_RECEIVER_THREADS_KEY),
@@ -500,6 +514,14 @@ public void testDataXceiverReconfiguration()
assertNull(String.format("expect %s is not configured",
DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY),
dn.getConf().get(DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY));
+
+ dn.reconfigureProperty(DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY, null);
+ assertEquals(String.format("%s has wrong value",
+ DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY),
+ null, dn.getXferServer().getReadThrottler());
+ assertNull(String.format("expect %s is not configured",
+ DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY),
+ dn.getConf().get(DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY));
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataTransferThrottler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataTransferThrottler.java
new file mode 100644
index 0000000000..a3e4222d75
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataTransferThrottler.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.hadoop.util.Time.monotonicNow;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests throttle the data transfers related functions.
+ */
+public class TestDataTransferThrottler {
+
+ /**
+ * Test read data transfer throttler.
+ */
+ @Test
+ public void testReadDataTransferThrottler() throws Exception {
+ final HdfsConfiguration conf = new HdfsConfiguration();
+
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build()) {
+ cluster.waitActive();
+ final DistributedFileSystem fs = cluster.getFileSystem();
+
+ // Create file.
+ Path file = new Path("/test");
+ long fileLength = 1024 * 1024 * 10 * 8;
+ DFSTestUtil.createFile(fs, file, fileLength, (short) 1, 0L);
+ DFSTestUtil.waitReplication(fs, file, (short) 1);
+
+ DataNode dataNode = cluster.getDataNodes().get(0);
+ // DataXceiverServer#readThrottler is null if
+ // dfs.datanode.data.read.bandwidthPerSec default value is 0.
+ Assert.assertNull(dataNode.xserver.getReadThrottler());
+
+ // Read file.
+ Assert.assertEquals(fileLength, DFSTestUtil.readFileAsBytes(fs, file).length);
+
+ // Set dfs.datanode.data.read.bandwidthPerSec.
+ long bandwidthPerSec = 1024 * 1024 * 8;
+ conf.setLong(DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY, bandwidthPerSec);
+
+ // Restart the first datanode.
+ cluster.stopDataNode(0);
+ cluster.startDataNodes(conf, 1, true, null, null);
+ dataNode = cluster.getDataNodes().get(0);
+ Assert.assertEquals(bandwidthPerSec, dataNode.xserver.getReadThrottler().getBandwidth());
+
+ // Read file with throttler.
+ long start = monotonicNow();
+ Assert.assertEquals(fileLength, DFSTestUtil.readFileAsBytes(fs, file).length);
+ long elapsedTime = monotonicNow() - start;
+ // Ensure throttler is effective, read 1024 * 1024 * 10 * 8 bytes,
+ // should take approximately 10 seconds (1024 * 1024 * 8 bytes per second).
+ long expectedElapsedTime = fileLength / bandwidthPerSec * 1000; // in milliseconds.
+ long acceptableError = 1000; // 1 milliseconds, allowing for a small margin of error.
+ assertTrue(elapsedTime >= expectedElapsedTime - acceptableError);
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
index d7fee5f1f8..682e033bf2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
@@ -346,7 +346,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException, Interr
final List outs = Lists.newArrayList();
final List errs = Lists.newArrayList();
getReconfigurableProperties("datanode", address, outs, errs);
- assertEquals(24, outs.size());
+ assertEquals(25, outs.size());
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
}