HDFS-17087. Add Throttler for datanode reading block (#5845)

Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org>
Reviewed-by: Ayush Saxena <ayushsaxena@apache.org>
Signed-off-by: Tao Li <tomscut@apache.org>
This commit is contained in:
huhaiyang 2023-08-17 09:33:50 +08:00 committed by GitHub
parent 911e9e0c01
commit 65e4a66e25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 157 additions and 3 deletions

View File

@ -123,6 +123,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.datanode.data.write.bandwidthPerSec"; "dfs.datanode.data.write.bandwidthPerSec";
// A value of zero indicates no limit // A value of zero indicates no limit
public static final long DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_DEFAULT = 0; public static final long DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_DEFAULT = 0;
public static final String DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY =
"dfs.datanode.data.read.bandwidthPerSec";
// A value of zero indicates no limit
public static final long DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_DEFAULT = 0;
public static final String DFS_DATANODE_EC_RECONSTRUCT_READ_BANDWIDTHPERSEC_KEY = public static final String DFS_DATANODE_EC_RECONSTRUCT_READ_BANDWIDTHPERSEC_KEY =
"dfs.datanode.ec.reconstruct.read.bandwidthPerSec"; "dfs.datanode.ec.reconstruct.read.bandwidthPerSec";
public static final long DFS_DATANODE_EC_RECONSTRUCT_READ_BANDWIDTHPERSEC_DEFAULT = public static final long DFS_DATANODE_EC_RECONSTRUCT_READ_BANDWIDTHPERSEC_DEFAULT =

View File

@ -57,6 +57,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_DEFAULT;
@ -368,7 +370,8 @@ public class DataNode extends ReconfigurableBase
DFS_DISK_BALANCER_ENABLED, DFS_DISK_BALANCER_ENABLED,
DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY, DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY,
DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY)); DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY,
DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY));
public static final String METRICS_LOG_NAME = "DataNodeMetricsLog"; public static final String METRICS_LOG_NAME = "DataNodeMetricsLog";
@ -702,6 +705,7 @@ public String reconfigurePropertyImpl(String property, String newVal)
case DFS_DATANODE_MAX_RECEIVER_THREADS_KEY: case DFS_DATANODE_MAX_RECEIVER_THREADS_KEY:
case DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY: case DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY:
case DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY: case DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY:
case DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY:
return reconfDataXceiverParameters(property, newVal); return reconfDataXceiverParameters(property, newVal);
case DFS_CACHEREPORT_INTERVAL_MSEC_KEY: case DFS_CACHEREPORT_INTERVAL_MSEC_KEY:
return reconfCacheReportParameters(property, newVal); return reconfCacheReportParameters(property, newVal);
@ -765,6 +769,18 @@ private String reconfDataXceiverParameters(String property, String newVal)
} }
result = Long.toString(bandwidthPerSec); result = Long.toString(bandwidthPerSec);
getXferServer().setWriteThrottler(writeThrottler); getXferServer().setWriteThrottler(writeThrottler);
} else if (property.equals(DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY)) {
Preconditions.checkNotNull(getXferServer(), "DataXceiverServer has not been initialized.");
long bandwidthPerSec = (newVal == null ? DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_DEFAULT :
Long.parseLong(newVal));
DataTransferThrottler readThrottler = null;
if (bandwidthPerSec > 0) {
readThrottler = new DataTransferThrottler(bandwidthPerSec);
} else {
bandwidthPerSec = 0;
}
result = Long.toString(bandwidthPerSec);
getXferServer().setReadThrottler(readThrottler);
} }
LOG.info("RECONFIGURE* changed {} to {}", property, newVal); LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
return result; return result;

View File

@ -608,7 +608,8 @@ public void readBlock(final ExtendedBlock block,
writeSuccessWithChecksumInfo(blockSender, new DataOutputStream(getOutputStream())); writeSuccessWithChecksumInfo(blockSender, new DataOutputStream(getOutputStream()));
long beginRead = Time.monotonicNow(); long beginRead = Time.monotonicNow();
read = blockSender.sendBlock(out, baseStream, null); // send data // send data
read = blockSender.sendBlock(out, baseStream, dataXceiverServer.getReadThrottler());
long duration = Time.monotonicNow() - beginRead; long duration = Time.monotonicNow() - beginRead;
if (blockSender.didSendEntireByteRange()) { if (blockSender.didSendEntireByteRange()) {
// If we sent the entire range, then we should expect the client // If we sent the entire range, then we should expect the client

View File

@ -172,6 +172,8 @@ void release() {
private volatile DataTransferThrottler writeThrottler; private volatile DataTransferThrottler writeThrottler;
private volatile DataTransferThrottler readThrottler;
/** /**
* Stores an estimate for block size to check if the disk partition has enough * Stores an estimate for block size to check if the disk partition has enough
* space. Newer clients pass the expected block size to the DataNode. For * space. Newer clients pass the expected block size to the DataNode. For
@ -221,6 +223,15 @@ private void initBandwidthPerSec(Configuration conf) {
} else { } else {
this.writeThrottler = null; this.writeThrottler = null;
} }
bandwidthPerSec = conf.getLongBytes(
DFSConfigKeys.DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY,
DFSConfigKeys.DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_DEFAULT);
if (bandwidthPerSec > 0) {
this.readThrottler = new DataTransferThrottler(bandwidthPerSec);
} else {
this.readThrottler = null;
}
} }
@Override @Override
@ -478,6 +489,10 @@ public DataTransferThrottler getWriteThrottler() {
return writeThrottler; return writeThrottler;
} }
public DataTransferThrottler getReadThrottler() {
return readThrottler;
}
/** /**
* Release a peer. * Release a peer.
* *
@ -535,4 +550,8 @@ public void setTransferThrottler(DataTransferThrottler transferThrottler) {
public void setWriteThrottler(DataTransferThrottler writeThrottler) { public void setWriteThrottler(DataTransferThrottler writeThrottler) {
this.writeThrottler = writeThrottler; this.writeThrottler = writeThrottler;
} }
public void setReadThrottler(DataTransferThrottler readThrottler) {
this.readThrottler = readThrottler;
}
} }

View File

@ -4721,6 +4721,15 @@
</description> </description>
</property> </property>
<property>
<name>dfs.datanode.data.read.bandwidthPerSec</name>
<value>0</value>
<description>
Specifies the maximum amount of bandwidth that can utilize for reading block.
When the bandwidth value is zero, there is no limit.
</description>
</property>
<property> <property>
<name>dfs.datanode.ec.reconstruct.read.bandwidthPerSec</name> <name>dfs.datanode.ec.reconstruct.read.bandwidthPerSec</name>
<value>0</value> <value>0</value>

View File

@ -31,6 +31,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
@ -459,6 +460,13 @@ public void testDataXceiverReconfiguration()
assertTrue("expecting NumberFormatException", assertTrue("expecting NumberFormatException",
expected.getCause() instanceof NumberFormatException); expected.getCause() instanceof NumberFormatException);
} }
try {
dn.reconfigureProperty(DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY, "text");
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting NumberFormatException",
expected.getCause() instanceof NumberFormatException);
}
// Change properties and verify change. // Change properties and verify change.
dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, String.valueOf(123)); dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, String.valueOf(123));
@ -477,6 +485,12 @@ public void testDataXceiverReconfiguration()
DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY), DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY),
1000, dn.getXferServer().getWriteThrottler().getBandwidth()); 1000, dn.getXferServer().getWriteThrottler().getBandwidth());
dn.reconfigureProperty(DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY,
String.valueOf(1000));
assertEquals(String.format("%s has wrong value",
DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY),
1000, dn.getXferServer().getReadThrottler().getBandwidth());
// Revert to default. // Revert to default.
dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, null); dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, null);
assertEquals(String.format("%s has wrong value", DFS_DATANODE_MAX_RECEIVER_THREADS_KEY), assertEquals(String.format("%s has wrong value", DFS_DATANODE_MAX_RECEIVER_THREADS_KEY),
@ -500,6 +514,14 @@ public void testDataXceiverReconfiguration()
assertNull(String.format("expect %s is not configured", assertNull(String.format("expect %s is not configured",
DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY), DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY),
dn.getConf().get(DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY)); dn.getConf().get(DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY));
dn.reconfigureProperty(DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY, null);
assertEquals(String.format("%s has wrong value",
DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY),
null, dn.getXferServer().getReadThrottler());
assertNull(String.format("expect %s is not configured",
DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY),
dn.getConf().get(DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY));
} }
} }

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.Assert;
import org.junit.Test;
import static org.apache.hadoop.util.Time.monotonicNow;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY;
import static org.junit.Assert.assertTrue;
/**
* Tests throttle the data transfers related functions.
*/
public class TestDataTransferThrottler {
/**
* Test read data transfer throttler.
*/
@Test
public void testReadDataTransferThrottler() throws Exception {
final HdfsConfiguration conf = new HdfsConfiguration();
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build()) {
cluster.waitActive();
final DistributedFileSystem fs = cluster.getFileSystem();
// Create file.
Path file = new Path("/test");
long fileLength = 1024 * 1024 * 10 * 8;
DFSTestUtil.createFile(fs, file, fileLength, (short) 1, 0L);
DFSTestUtil.waitReplication(fs, file, (short) 1);
DataNode dataNode = cluster.getDataNodes().get(0);
// DataXceiverServer#readThrottler is null if
// dfs.datanode.data.read.bandwidthPerSec default value is 0.
Assert.assertNull(dataNode.xserver.getReadThrottler());
// Read file.
Assert.assertEquals(fileLength, DFSTestUtil.readFileAsBytes(fs, file).length);
// Set dfs.datanode.data.read.bandwidthPerSec.
long bandwidthPerSec = 1024 * 1024 * 8;
conf.setLong(DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY, bandwidthPerSec);
// Restart the first datanode.
cluster.stopDataNode(0);
cluster.startDataNodes(conf, 1, true, null, null);
dataNode = cluster.getDataNodes().get(0);
Assert.assertEquals(bandwidthPerSec, dataNode.xserver.getReadThrottler().getBandwidth());
// Read file with throttler.
long start = monotonicNow();
Assert.assertEquals(fileLength, DFSTestUtil.readFileAsBytes(fs, file).length);
long elapsedTime = monotonicNow() - start;
// Ensure throttler is effective, read 1024 * 1024 * 10 * 8 bytes,
// should take approximately 10 seconds (1024 * 1024 * 8 bytes per second).
long expectedElapsedTime = fileLength / bandwidthPerSec * 1000; // in milliseconds.
long acceptableError = 1000; // 1 milliseconds, allowing for a small margin of error.
assertTrue(elapsedTime >= expectedElapsedTime - acceptableError);
}
}
}

View File

@ -346,7 +346,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException, Interr
final List<String> outs = Lists.newArrayList(); final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList(); final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("datanode", address, outs, errs); getReconfigurableProperties("datanode", address, outs, errs);
assertEquals(24, outs.size()); assertEquals(25, outs.size());
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1)); assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
} }