HDFS-17075. Reconfig disk balancer parameters for datanode (#5823). Contributed by Haiyang Hu.
Signed-off-by: Ayush Saxena <ayushsaxena@apache.org>
This commit is contained in:
parent
b95595158f
commit
c44823dadb
@ -77,6 +77,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_SLOWDISKS_TO
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_ENABLED;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_ENABLED_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT;
|
||||
@ -356,7 +360,9 @@ public class DataNode extends ReconfigurableBase
|
||||
DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY,
|
||||
FS_DU_INTERVAL_KEY,
|
||||
FS_GETSPACEUSED_JITTER_KEY,
|
||||
FS_GETSPACEUSED_CLASSNAME));
|
||||
FS_GETSPACEUSED_CLASSNAME,
|
||||
DFS_DISK_BALANCER_ENABLED,
|
||||
DFS_DISK_BALANCER_PLAN_VALID_INTERVAL));
|
||||
|
||||
public static final String METRICS_LOG_NAME = "DataNodeMetricsLog";
|
||||
|
||||
@ -706,6 +712,9 @@ public class DataNode extends ReconfigurableBase
|
||||
case FS_GETSPACEUSED_JITTER_KEY:
|
||||
case FS_GETSPACEUSED_CLASSNAME:
|
||||
return reconfDfsUsageParameters(property, newVal);
|
||||
case DFS_DISK_BALANCER_ENABLED:
|
||||
case DFS_DISK_BALANCER_PLAN_VALID_INTERVAL:
|
||||
return reconfDiskBalancerParameters(property, newVal);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
@ -951,6 +960,44 @@ public class DataNode extends ReconfigurableBase
|
||||
}
|
||||
}
|
||||
|
||||
private String reconfDiskBalancerParameters(String property, String newVal)
|
||||
throws ReconfigurationException {
|
||||
String result = null;
|
||||
try {
|
||||
LOG.info("Reconfiguring {} to {}", property, newVal);
|
||||
if (property.equals(DFS_DISK_BALANCER_ENABLED)) {
|
||||
if (newVal != null && !newVal.equalsIgnoreCase("true")
|
||||
&& !newVal.equalsIgnoreCase("false")) {
|
||||
throw new IllegalArgumentException("Not a valid Boolean value for " + property);
|
||||
}
|
||||
boolean enable = (newVal == null ? DFS_DISK_BALANCER_ENABLED_DEFAULT :
|
||||
Boolean.parseBoolean(newVal));
|
||||
getDiskBalancer().setDiskBalancerEnabled(enable);
|
||||
result = Boolean.toString(enable);
|
||||
} else if (property.equals(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL)) {
|
||||
if (newVal == null) {
|
||||
// set to default
|
||||
long defaultInterval = getConf().getTimeDuration(
|
||||
DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
|
||||
DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
getDiskBalancer().setPlanValidityInterval(defaultInterval);
|
||||
result = DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT;
|
||||
} else {
|
||||
long newInterval = getConf()
|
||||
.getTimeDurationHelper(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
|
||||
newVal, TimeUnit.MILLISECONDS);
|
||||
getDiskBalancer().setPlanValidityInterval(newInterval);
|
||||
result = newVal;
|
||||
}
|
||||
}
|
||||
LOG.info("RECONFIGURE* changed {} to {}", property, result);
|
||||
return result;
|
||||
} catch (IllegalArgumentException | IOException e) {
|
||||
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a list of the keys of the re-configurable properties in configuration.
|
||||
*/
|
||||
@ -4201,7 +4248,8 @@ public class DataNode extends ReconfigurableBase
|
||||
return volumeInfoList;
|
||||
}
|
||||
|
||||
private DiskBalancer getDiskBalancer() throws IOException {
|
||||
@VisibleForTesting
|
||||
public DiskBalancer getDiskBalancer() throws IOException {
|
||||
if (this.diskBalancer == null) {
|
||||
throw new IOException("DiskBalancer is not initialized");
|
||||
}
|
||||
|
@ -82,14 +82,14 @@ public class DiskBalancer {
|
||||
private final BlockMover blockMover;
|
||||
private final ReentrantLock lock;
|
||||
private final ConcurrentHashMap<VolumePair, DiskBalancerWorkItem> workMap;
|
||||
private boolean isDiskBalancerEnabled = false;
|
||||
private volatile boolean isDiskBalancerEnabled = false;
|
||||
private ExecutorService scheduler;
|
||||
private Future future;
|
||||
private String planID;
|
||||
private String planFile;
|
||||
private DiskBalancerWorkStatus.Result currentResult;
|
||||
private long bandwidth;
|
||||
private long planValidityInterval;
|
||||
private volatile long planValidityInterval;
|
||||
private final Configuration config;
|
||||
|
||||
/**
|
||||
@ -341,6 +341,58 @@ public class DiskBalancer {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets Disk balancer is to enable or not to enable.
|
||||
*
|
||||
* @param diskBalancerEnabled
|
||||
* true, enable diskBalancer, otherwise false to disable it.
|
||||
*/
|
||||
public void setDiskBalancerEnabled(boolean diskBalancerEnabled) {
|
||||
isDiskBalancerEnabled = diskBalancerEnabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the value indicating if diskBalancer is enabled.
|
||||
*
|
||||
* @return boolean.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public boolean isDiskBalancerEnabled() {
|
||||
return isDiskBalancerEnabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets maximum amount of time disk balancer plan is valid.
|
||||
*
|
||||
* @param planValidityInterval - maximum amount of time in the unit of milliseconds.
|
||||
*/
|
||||
public void setPlanValidityInterval(long planValidityInterval) {
|
||||
this.config.setTimeDuration(DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
|
||||
planValidityInterval, TimeUnit.MILLISECONDS);
|
||||
this.planValidityInterval = planValidityInterval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets maximum amount of time disk balancer plan is valid.
|
||||
*
|
||||
* @return the maximum amount of time in milliseconds.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public long getPlanValidityInterval() {
|
||||
return planValidityInterval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets maximum amount of time disk balancer plan is valid in config.
|
||||
*
|
||||
* @return the maximum amount of time in milliseconds.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public long getPlanValidityIntervalInConfig() {
|
||||
return config.getTimeDuration(DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
|
||||
DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that user provided plan is valid.
|
||||
*
|
||||
|
@ -46,6 +46,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETE
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_ENABLED;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_ENABLED_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
@ -57,6 +61,7 @@ import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.ReconfigurationException;
|
||||
@ -794,4 +799,56 @@ public class TestDataNodeReconfiguration {
|
||||
Thread.sleep(5000);
|
||||
assertTrue(counter > lastCounter);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDiskBalancerParameters() throws Exception {
|
||||
for (int i = 0; i < NUM_DATA_NODE; i++) {
|
||||
DataNode dn = cluster.getDataNodes().get(i);
|
||||
|
||||
// Verify DFS_DISK_BALANCER_ENABLED.
|
||||
// Try invalid values.
|
||||
LambdaTestUtils.intercept(ReconfigurationException.class,
|
||||
"Could not change property dfs.disk.balancer.enabled from 'true' to 'text'",
|
||||
() -> dn.reconfigureProperty(DFS_DISK_BALANCER_ENABLED, "text"));
|
||||
|
||||
// Set default value.
|
||||
dn.reconfigureProperty(DFS_DISK_BALANCER_ENABLED, null);
|
||||
assertEquals(dn.getConf().getBoolean(DFS_DISK_BALANCER_ENABLED,
|
||||
DFS_DISK_BALANCER_ENABLED_DEFAULT), dn.getDiskBalancer().isDiskBalancerEnabled());
|
||||
|
||||
// Set DFS_DISK_BALANCER_ENABLED to false.
|
||||
dn.reconfigureProperty(DFS_DISK_BALANCER_ENABLED, "false");
|
||||
assertFalse(dn.getDiskBalancer().isDiskBalancerEnabled());
|
||||
|
||||
// Set DFS_DISK_BALANCER_ENABLED to true.
|
||||
dn.reconfigureProperty(DFS_DISK_BALANCER_ENABLED, "true");
|
||||
assertTrue(dn.getDiskBalancer().isDiskBalancerEnabled());
|
||||
|
||||
// Verify DFS_DISK_BALANCER_PLAN_VALID_INTERVAL.
|
||||
// Try invalid values.
|
||||
LambdaTestUtils.intercept(ReconfigurationException.class,
|
||||
"Could not change property dfs.disk.balancer.plan.valid.interval from " +
|
||||
"'1d' to 'text'",
|
||||
() -> dn.reconfigureProperty(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "text"));
|
||||
|
||||
// Set default value.
|
||||
dn.reconfigureProperty(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, null);
|
||||
assertEquals(dn.getConf().getTimeDuration(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
|
||||
DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS),
|
||||
dn.getDiskBalancer().getPlanValidityInterval());
|
||||
assertEquals(dn.getConf().getTimeDuration(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
|
||||
DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS),
|
||||
dn.getDiskBalancer().getPlanValidityIntervalInConfig());
|
||||
|
||||
// Set value is 6 then 6 milliseconds.
|
||||
dn.reconfigureProperty(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "" + 6);
|
||||
assertEquals(6, dn.getDiskBalancer().getPlanValidityInterval());
|
||||
assertEquals(6, dn.getDiskBalancer().getPlanValidityIntervalInConfig());
|
||||
|
||||
// Set value with time unit.
|
||||
dn.reconfigureProperty(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "1m");
|
||||
assertEquals(60000, dn.getDiskBalancer().getPlanValidityInterval());
|
||||
assertEquals(60000, dn.getDiskBalancer().getPlanValidityIntervalInConfig());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -197,7 +197,7 @@ public class TestDiskBalancerRPC {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testgetDiskBalancerBandwidth() throws Exception {
|
||||
public void testGetDiskBalancerBandwidth() throws Exception {
|
||||
RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
|
||||
DataNode dataNode = rpcTestHelper.getDataNode();
|
||||
String planHash = rpcTestHelper.getPlanHash();
|
||||
|
@ -346,7 +346,7 @@ public class TestDFSAdmin {
|
||||
final List<String> outs = Lists.newArrayList();
|
||||
final List<String> errs = Lists.newArrayList();
|
||||
getReconfigurableProperties("datanode", address, outs, errs);
|
||||
assertEquals(20, outs.size());
|
||||
assertEquals(22, outs.size());
|
||||
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user