diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 2096f18d31..0ed1304cb8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -77,6 +77,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_ENABLED; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_ENABLED_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT; @@ -356,7 +360,9 @@ public class DataNode extends ReconfigurableBase DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY, FS_DU_INTERVAL_KEY, FS_GETSPACEUSED_JITTER_KEY, - FS_GETSPACEUSED_CLASSNAME)); + FS_GETSPACEUSED_CLASSNAME, + DFS_DISK_BALANCER_ENABLED, + DFS_DISK_BALANCER_PLAN_VALID_INTERVAL)); public static final String METRICS_LOG_NAME = "DataNodeMetricsLog"; @@ -706,6 +712,9 @@ public String reconfigurePropertyImpl(String property, String newVal) case FS_GETSPACEUSED_JITTER_KEY: case FS_GETSPACEUSED_CLASSNAME: return reconfDfsUsageParameters(property, newVal); + case DFS_DISK_BALANCER_ENABLED: + case DFS_DISK_BALANCER_PLAN_VALID_INTERVAL: + return reconfDiskBalancerParameters(property, newVal); default: break; } @@ -951,6 +960,44 @@ private String reconfDfsUsageParameters(String property, String newVal) } } + private String reconfDiskBalancerParameters(String property, String newVal) + throws ReconfigurationException { + String result = null; + try { + LOG.info("Reconfiguring {} to {}", property, newVal); + if (property.equals(DFS_DISK_BALANCER_ENABLED)) { + if (newVal != null && !newVal.equalsIgnoreCase("true") + && !newVal.equalsIgnoreCase("false")) { + throw new IllegalArgumentException("Not a valid Boolean value for " + property); + } + boolean enable = (newVal == null ? DFS_DISK_BALANCER_ENABLED_DEFAULT : + Boolean.parseBoolean(newVal)); + getDiskBalancer().setDiskBalancerEnabled(enable); + result = Boolean.toString(enable); + } else if (property.equals(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL)) { + if (newVal == null) { + // set to default + long defaultInterval = getConf().getTimeDuration( + DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, + DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + getDiskBalancer().setPlanValidityInterval(defaultInterval); + result = DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT; + } else { + long newInterval = getConf() + .getTimeDurationHelper(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, + newVal, TimeUnit.MILLISECONDS); + getDiskBalancer().setPlanValidityInterval(newInterval); + result = newVal; + } + } + LOG.info("RECONFIGURE* changed {} to {}", property, result); + return result; + } catch (IllegalArgumentException | IOException e) { + throw new ReconfigurationException(property, newVal, getConf().get(property), e); + } + } + /** * Get a list of the keys of the re-configurable properties in configuration. */ @@ -4201,7 +4248,8 @@ public List getVolumeReport() throws IOException { return volumeInfoList; } - private DiskBalancer getDiskBalancer() throws IOException { + @VisibleForTesting + public DiskBalancer getDiskBalancer() throws IOException { if (this.diskBalancer == null) { throw new IOException("DiskBalancer is not initialized"); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java index 4126140678..e2f9877483 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java @@ -82,14 +82,14 @@ public class DiskBalancer { private final BlockMover blockMover; private final ReentrantLock lock; private final ConcurrentHashMap workMap; - private boolean isDiskBalancerEnabled = false; + private volatile boolean isDiskBalancerEnabled = false; private ExecutorService scheduler; private Future future; private String planID; private String planFile; private DiskBalancerWorkStatus.Result currentResult; private long bandwidth; - private long planValidityInterval; + private volatile long planValidityInterval; private final Configuration config; /** @@ -341,6 +341,58 @@ private void checkDiskBalancerEnabled() } } + /** + * Sets Disk balancer is to enable or not to enable. + * + * @param diskBalancerEnabled + * true, enable diskBalancer, otherwise false to disable it. + */ + public void setDiskBalancerEnabled(boolean diskBalancerEnabled) { + isDiskBalancerEnabled = diskBalancerEnabled; + } + + /** + * Returns the value indicating if diskBalancer is enabled. + * + * @return boolean. + */ + @VisibleForTesting + public boolean isDiskBalancerEnabled() { + return isDiskBalancerEnabled; + } + + /** + * Sets maximum amount of time disk balancer plan is valid. + * + * @param planValidityInterval - maximum amount of time in the unit of milliseconds. + */ + public void setPlanValidityInterval(long planValidityInterval) { + this.config.setTimeDuration(DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, + planValidityInterval, TimeUnit.MILLISECONDS); + this.planValidityInterval = planValidityInterval; + } + + /** + * Gets maximum amount of time disk balancer plan is valid. + * + * @return the maximum amount of time in milliseconds. + */ + @VisibleForTesting + public long getPlanValidityInterval() { + return planValidityInterval; + } + + /** + * Gets maximum amount of time disk balancer plan is valid in config. + * + * @return the maximum amount of time in milliseconds. + */ + @VisibleForTesting + public long getPlanValidityIntervalInConfig() { + return config.getTimeDuration(DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, + DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); + } + /** * Verifies that user provided plan is valid. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java index d9578ca02a..a14ee2554f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java @@ -46,6 +46,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_ENABLED; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_ENABLED_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -57,6 +61,7 @@ import java.net.InetSocketAddress; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.ReconfigurationException; @@ -794,4 +799,56 @@ public void testDfsUsageKlass() throws ReconfigurationException, InterruptedExce Thread.sleep(5000); assertTrue(counter > lastCounter); } + + @Test + public void testDiskBalancerParameters() throws Exception { + for (int i = 0; i < NUM_DATA_NODE; i++) { + DataNode dn = cluster.getDataNodes().get(i); + + // Verify DFS_DISK_BALANCER_ENABLED. + // Try invalid values. + LambdaTestUtils.intercept(ReconfigurationException.class, + "Could not change property dfs.disk.balancer.enabled from 'true' to 'text'", + () -> dn.reconfigureProperty(DFS_DISK_BALANCER_ENABLED, "text")); + + // Set default value. + dn.reconfigureProperty(DFS_DISK_BALANCER_ENABLED, null); + assertEquals(dn.getConf().getBoolean(DFS_DISK_BALANCER_ENABLED, + DFS_DISK_BALANCER_ENABLED_DEFAULT), dn.getDiskBalancer().isDiskBalancerEnabled()); + + // Set DFS_DISK_BALANCER_ENABLED to false. + dn.reconfigureProperty(DFS_DISK_BALANCER_ENABLED, "false"); + assertFalse(dn.getDiskBalancer().isDiskBalancerEnabled()); + + // Set DFS_DISK_BALANCER_ENABLED to true. + dn.reconfigureProperty(DFS_DISK_BALANCER_ENABLED, "true"); + assertTrue(dn.getDiskBalancer().isDiskBalancerEnabled()); + + // Verify DFS_DISK_BALANCER_PLAN_VALID_INTERVAL. + // Try invalid values. + LambdaTestUtils.intercept(ReconfigurationException.class, + "Could not change property dfs.disk.balancer.plan.valid.interval from " + + "'1d' to 'text'", + () -> dn.reconfigureProperty(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "text")); + + // Set default value. + dn.reconfigureProperty(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, null); + assertEquals(dn.getConf().getTimeDuration(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, + DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS), + dn.getDiskBalancer().getPlanValidityInterval()); + assertEquals(dn.getConf().getTimeDuration(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, + DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS), + dn.getDiskBalancer().getPlanValidityIntervalInConfig()); + + // Set value is 6 then 6 milliseconds. + dn.reconfigureProperty(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "" + 6); + assertEquals(6, dn.getDiskBalancer().getPlanValidityInterval()); + assertEquals(6, dn.getDiskBalancer().getPlanValidityIntervalInConfig()); + + // Set value with time unit. + dn.reconfigureProperty(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "1m"); + assertEquals(60000, dn.getDiskBalancer().getPlanValidityInterval()); + assertEquals(60000, dn.getDiskBalancer().getPlanValidityIntervalInConfig()); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java index 21c9a5937e..258f812dff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java @@ -197,7 +197,7 @@ public void testGetDiskBalancerInvalidSetting() throws Exception { } @Test - public void testgetDiskBalancerBandwidth() throws Exception { + public void testGetDiskBalancerBandwidth() throws Exception { RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke(); DataNode dataNode = rpcTestHelper.getDataNode(); String planHash = rpcTestHelper.getPlanHash(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index 4b1db53e88..78664e27ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -346,7 +346,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException, Interr final List outs = Lists.newArrayList(); final List errs = Lists.newArrayList(); getReconfigurableProperties("datanode", address, outs, errs); - assertEquals(20, outs.size()); + assertEquals(22, outs.size()); assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1)); }