diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 0ed1304cb8..bb61e8037e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -36,6 +36,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_KEY; @@ -362,7 +366,9 @@ public class DataNode extends ReconfigurableBase FS_GETSPACEUSED_JITTER_KEY, FS_GETSPACEUSED_CLASSNAME, DFS_DISK_BALANCER_ENABLED, - DFS_DISK_BALANCER_PLAN_VALID_INTERVAL)); + DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, + DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY, + DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY)); public static final String METRICS_LOG_NAME = "DataNodeMetricsLog"; @@ -694,6 +700,8 @@ public String reconfigurePropertyImpl(String property, String newVal) case DFS_BLOCKREPORT_INITIAL_DELAY_KEY: return reconfBlockReportParameters(property, newVal); case DFS_DATANODE_MAX_RECEIVER_THREADS_KEY: + case DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY: + case DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY: return reconfDataXceiverParameters(property, newVal); case DFS_CACHEREPORT_INTERVAL_MSEC_KEY: return reconfCacheReportParameters(property, newVal); @@ -724,14 +732,40 @@ public String reconfigurePropertyImpl(String property, String newVal) private String reconfDataXceiverParameters(String property, String newVal) throws ReconfigurationException { - String result; + String result = null; try { LOG.info("Reconfiguring {} to {}", property, newVal); - Preconditions.checkNotNull(getXferServer(), "DataXceiverServer has not been initialized."); - int threads = (newVal == null ? DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT : - Integer.parseInt(newVal)); - result = Integer.toString(threads); - getXferServer().setMaxXceiverCount(threads); + if (property.equals(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY)) { + Preconditions.checkNotNull(getXferServer(), "DataXceiverServer has not been initialized."); + int threads = (newVal == null ? DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT : + Integer.parseInt(newVal)); + result = Integer.toString(threads); + getXferServer().setMaxXceiverCount(threads); + } else if (property.equals(DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY)) { + Preconditions.checkNotNull(getXferServer(), "DataXceiverServer has not been initialized."); + long bandwidthPerSec = (newVal == null ? + DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_DEFAULT : Long.parseLong(newVal)); + DataTransferThrottler transferThrottler = null; + if (bandwidthPerSec > 0) { + transferThrottler = new DataTransferThrottler(bandwidthPerSec); + } else { + bandwidthPerSec = 0; + } + result = Long.toString(bandwidthPerSec); + getXferServer().setTransferThrottler(transferThrottler); + } else if (property.equals(DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY)) { + Preconditions.checkNotNull(getXferServer(), "DataXceiverServer has not been initialized."); + long bandwidthPerSec = (newVal == null ? DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_DEFAULT : + Long.parseLong(newVal)); + DataTransferThrottler writeThrottler = null; + if (bandwidthPerSec > 0) { + writeThrottler = new DataTransferThrottler(bandwidthPerSec); + } else { + bandwidthPerSec = 0; + } + result = Long.toString(bandwidthPerSec); + getXferServer().setWriteThrottler(writeThrottler); + } LOG.info("RECONFIGURE* changed {} to {}", property, newVal); return result; } catch (IllegalArgumentException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java index 3a67b76e43..9b31dd3b21 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java @@ -168,9 +168,9 @@ void release() { final BlockBalanceThrottler balanceThrottler; - private final DataTransferThrottler transferThrottler; + private volatile DataTransferThrottler transferThrottler; - private final DataTransferThrottler writeThrottler; + private volatile DataTransferThrottler writeThrottler; /** * Stores an estimate for block size to check if the disk partition has enough @@ -200,7 +200,10 @@ void release() { DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT), conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT)); + initBandwidthPerSec(conf); + } + private void initBandwidthPerSec(Configuration conf) { long bandwidthPerSec = conf.getLongBytes( DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY, DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_DEFAULT); @@ -524,4 +527,12 @@ public void setMaxXceiverCount(int xceiverCount) { public int getMaxXceiverCount() { return maxXceiverCount; } + + public void setTransferThrottler(DataTransferThrottler transferThrottler) { + this.transferThrottler = transferThrottler; + } + + public void setWriteThrottler(DataTransferThrottler writeThrottler) { + this.writeThrottler = writeThrottler; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java index a14ee2554f..0ca5bedff8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java @@ -31,6 +31,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_DEFAULT; @@ -443,20 +445,61 @@ public void testDataXceiverReconfiguration() assertTrue("expecting IllegalArgumentException", expected.getCause() instanceof IllegalArgumentException); } + try { + dn.reconfigureProperty(DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY, "text"); + fail("ReconfigurationException expected"); + } catch (ReconfigurationException expected) { + assertTrue("expecting NumberFormatException", + expected.getCause() instanceof NumberFormatException); + } + try { + dn.reconfigureProperty(DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY, "text"); + fail("ReconfigurationException expected"); + } catch (ReconfigurationException expected) { + assertTrue("expecting NumberFormatException", + expected.getCause() instanceof NumberFormatException); + } // Change properties and verify change. dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, String.valueOf(123)); assertEquals(String.format("%s has wrong value", DFS_DATANODE_MAX_RECEIVER_THREADS_KEY), 123, dn.getXferServer().getMaxXceiverCount()); + dn.reconfigureProperty(DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY, + String.valueOf(1000)); + assertEquals(String.format("%s has wrong value", + DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY), + 1000, dn.getXferServer().getTransferThrottler().getBandwidth()); + + dn.reconfigureProperty(DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY, + String.valueOf(1000)); + assertEquals(String.format("%s has wrong value", + DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY), + 1000, dn.getXferServer().getWriteThrottler().getBandwidth()); + // Revert to default. dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, null); assertEquals(String.format("%s has wrong value", DFS_DATANODE_MAX_RECEIVER_THREADS_KEY), DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT, dn.getXferServer().getMaxXceiverCount()); - assertNull(String.format("expect %s is not configured", DFS_DATANODE_MAX_RECEIVER_THREADS_KEY), dn.getConf().get(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY)); + + dn.reconfigureProperty(DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY, null); + assertEquals(String.format("%s has wrong value", + DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY), + null, dn.getXferServer().getTransferThrottler()); + assertNull(String.format("expect %s is not configured", + DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY), + dn.getConf().get(DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY)); + + dn.reconfigureProperty(DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY, null); + assertEquals(String.format("%s has wrong value", + DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY), + null, dn.getXferServer().getWriteThrottler()); + assertNull(String.format("expect %s is not configured", + DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY), + dn.getConf().get(DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY)); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index 78664e27ca..d7fee5f1f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -346,7 +346,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException, Interr final List outs = Lists.newArrayList(); final List errs = Lists.newArrayList(); getReconfigurableProperties("datanode", address, outs, errs); - assertEquals(22, outs.size()); + assertEquals(24, outs.size()); assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1)); }