HDFS-16502. Reconfigure Block Invalidate limit (#4064)
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
parent
8b8158f02d
commit
1c0bc35305
@ -314,18 +314,12 @@ public class DatanodeManager {
|
|||||||
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
|
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
|
||||||
this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval
|
this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval
|
||||||
+ 10 * 1000 * heartbeatIntervalSeconds;
|
+ 10 * 1000 * heartbeatIntervalSeconds;
|
||||||
// Effected block invalidate limit is the bigger value between
|
|
||||||
// value configured in hdfs-site.xml, and 20 * HB interval.
|
|
||||||
final int configuredBlockInvalidateLimit = conf.getInt(
|
final int configuredBlockInvalidateLimit = conf.getInt(
|
||||||
DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY,
|
DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY,
|
||||||
DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT);
|
DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT);
|
||||||
final int countedBlockInvalidateLimit = 20*(int)(heartbeatIntervalSeconds);
|
// Block invalidate limit also has some dependency on heartbeat interval.
|
||||||
this.blockInvalidateLimit = Math.max(countedBlockInvalidateLimit,
|
// Check setBlockInvalidateLimit().
|
||||||
configuredBlockInvalidateLimit);
|
setBlockInvalidateLimit(configuredBlockInvalidateLimit);
|
||||||
LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY
|
|
||||||
+ ": configured=" + configuredBlockInvalidateLimit
|
|
||||||
+ ", counted=" + countedBlockInvalidateLimit
|
|
||||||
+ ", effected=" + blockInvalidateLimit);
|
|
||||||
this.checkIpHostnameInRegistration = conf.getBoolean(
|
this.checkIpHostnameInRegistration = conf.getBoolean(
|
||||||
DFSConfigKeys.DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY,
|
DFSConfigKeys.DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY,
|
||||||
DFSConfigKeys.DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT);
|
DFSConfigKeys.DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT);
|
||||||
@ -2088,8 +2082,25 @@ private void setHeartbeatInterval(long intervalSeconds,
|
|||||||
this.heartbeatRecheckInterval = recheckInterval;
|
this.heartbeatRecheckInterval = recheckInterval;
|
||||||
this.heartbeatExpireInterval = 2L * recheckInterval + 10 * 1000
|
this.heartbeatExpireInterval = 2L * recheckInterval + 10 * 1000
|
||||||
* intervalSeconds;
|
* intervalSeconds;
|
||||||
this.blockInvalidateLimit = Math.max(20 * (int) (intervalSeconds),
|
this.blockInvalidateLimit = getBlockInvalidateLimit(blockInvalidateLimit);
|
||||||
blockInvalidateLimit);
|
}
|
||||||
|
|
||||||
|
private int getBlockInvalidateLimitFromHBInterval() {
|
||||||
|
return 20 * (int) heartbeatIntervalSeconds;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getBlockInvalidateLimit(int configuredBlockInvalidateLimit) {
|
||||||
|
return Math.max(getBlockInvalidateLimitFromHBInterval(), configuredBlockInvalidateLimit);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setBlockInvalidateLimit(int configuredBlockInvalidateLimit) {
|
||||||
|
final int countedBlockInvalidateLimit = getBlockInvalidateLimitFromHBInterval();
|
||||||
|
// Effected block invalidate limit is the bigger value between
|
||||||
|
// value configured in hdfs-site.xml, and 20 * HB interval.
|
||||||
|
this.blockInvalidateLimit = getBlockInvalidateLimit(configuredBlockInvalidateLimit);
|
||||||
|
LOG.info("{} : configured={}, counted={}, effected={}",
|
||||||
|
DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, configuredBlockInvalidateLimit,
|
||||||
|
countedBlockInvalidateLimit, this.blockInvalidateLimit);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -121,6 +121,7 @@
|
|||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT;
|
||||||
@ -337,7 +338,8 @@ public enum OperationCategory {
|
|||||||
DFS_IMAGE_PARALLEL_LOAD_KEY,
|
DFS_IMAGE_PARALLEL_LOAD_KEY,
|
||||||
DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY,
|
DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY,
|
||||||
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY,
|
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY,
|
||||||
DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY));
|
DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY,
|
||||||
|
DFS_BLOCK_INVALIDATE_LIMIT_KEY));
|
||||||
|
|
||||||
private static final String USAGE = "Usage: hdfs namenode ["
|
private static final String USAGE = "Usage: hdfs namenode ["
|
||||||
+ StartupOption.BACKUP.getName() + "] | \n\t["
|
+ StartupOption.BACKUP.getName() + "] | \n\t["
|
||||||
@ -2210,6 +2212,8 @@ protected String reconfigurePropertyImpl(String property, String newVal)
|
|||||||
|| (property.equals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY))
|
|| (property.equals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY))
|
||||||
|| (property.equals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY))) {
|
|| (property.equals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY))) {
|
||||||
return reconfigureSlowNodesParameters(datanodeManager, property, newVal);
|
return reconfigureSlowNodesParameters(datanodeManager, property, newVal);
|
||||||
|
} else if (property.equals(DFS_BLOCK_INVALIDATE_LIMIT_KEY)) {
|
||||||
|
return reconfigureBlockInvalidateLimit(datanodeManager, property, newVal);
|
||||||
} else {
|
} else {
|
||||||
throw new ReconfigurationException(property, newVal, getConf().get(
|
throw new ReconfigurationException(property, newVal, getConf().get(
|
||||||
property));
|
property));
|
||||||
@ -2434,6 +2438,27 @@ String reconfigureSlowNodesParameters(final DatanodeManager datanodeManager,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String reconfigureBlockInvalidateLimit(final DatanodeManager datanodeManager,
|
||||||
|
final String property, final String newVal) throws ReconfigurationException {
|
||||||
|
namesystem.writeLock();
|
||||||
|
try {
|
||||||
|
if (newVal == null) {
|
||||||
|
datanodeManager.setBlockInvalidateLimit(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT);
|
||||||
|
} else {
|
||||||
|
datanodeManager.setBlockInvalidateLimit(Integer.parseInt(newVal));
|
||||||
|
}
|
||||||
|
final String updatedBlockInvalidateLimit =
|
||||||
|
String.valueOf(datanodeManager.getBlockInvalidateLimit());
|
||||||
|
LOG.info("RECONFIGURE* changed blockInvalidateLimit to {}", updatedBlockInvalidateLimit);
|
||||||
|
return updatedBlockInvalidateLimit;
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
|
||||||
|
} finally {
|
||||||
|
namesystem.writeUnlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override // ReconfigurableBase
|
@Override // ReconfigurableBase
|
||||||
protected Configuration getNewConf() {
|
protected Configuration getNewConf() {
|
||||||
return new HdfsConfiguration();
|
return new HdfsConfiguration();
|
||||||
|
@ -453,6 +453,37 @@ public void testReconfigureMaxSlowpeerCollectNodes()
|
|||||||
assertEquals(10, datanodeManager.getMaxSlowpeerCollectNodes());
|
assertEquals(10, datanodeManager.getMaxSlowpeerCollectNodes());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockInvalidateLimit() throws ReconfigurationException {
|
||||||
|
final NameNode nameNode = cluster.getNameNode();
|
||||||
|
final DatanodeManager datanodeManager = nameNode.namesystem
|
||||||
|
.getBlockManager().getDatanodeManager();
|
||||||
|
|
||||||
|
assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY + " is not correctly set",
|
||||||
|
customizedBlockInvalidateLimit, datanodeManager.getBlockInvalidateLimit());
|
||||||
|
|
||||||
|
try {
|
||||||
|
nameNode.reconfigureProperty(DFS_BLOCK_INVALIDATE_LIMIT_KEY, "non-numeric");
|
||||||
|
fail("Should not reach here");
|
||||||
|
} catch (ReconfigurationException e) {
|
||||||
|
assertEquals(
|
||||||
|
"Could not change property dfs.block.invalidate.limit from '500' to 'non-numeric'",
|
||||||
|
e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
nameNode.reconfigureProperty(DFS_BLOCK_INVALIDATE_LIMIT_KEY, "2500");
|
||||||
|
|
||||||
|
assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY + " is not honored after reconfiguration", 2500,
|
||||||
|
datanodeManager.getBlockInvalidateLimit());
|
||||||
|
|
||||||
|
nameNode.reconfigureProperty(DFS_HEARTBEAT_INTERVAL_KEY, "500");
|
||||||
|
|
||||||
|
// 20 * 500 (10000) > 2500
|
||||||
|
// Hence, invalid block limit should be reset to 10000
|
||||||
|
assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY + " is not reconfigured correctly", 10000,
|
||||||
|
datanodeManager.getBlockInvalidateLimit());
|
||||||
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void shutDown() throws IOException {
|
public void shutDown() throws IOException {
|
||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
|
@ -31,6 +31,7 @@
|
|||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
|
||||||
@ -433,15 +434,17 @@ public void testNameNodeGetReconfigurableProperties() throws IOException {
|
|||||||
final List<String> outs = Lists.newArrayList();
|
final List<String> outs = Lists.newArrayList();
|
||||||
final List<String> errs = Lists.newArrayList();
|
final List<String> errs = Lists.newArrayList();
|
||||||
getReconfigurableProperties("namenode", address, outs, errs);
|
getReconfigurableProperties("namenode", address, outs, errs);
|
||||||
assertEquals(16, outs.size());
|
assertEquals(17, outs.size());
|
||||||
assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(1));
|
assertTrue(outs.get(0).contains("Reconfigurable properties:"));
|
||||||
assertEquals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, outs.get(2));
|
assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY, outs.get(1));
|
||||||
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(3));
|
assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(2));
|
||||||
assertEquals(DFS_IMAGE_PARALLEL_LOAD_KEY, outs.get(4));
|
assertEquals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, outs.get(3));
|
||||||
assertEquals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, outs.get(5));
|
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(4));
|
||||||
assertEquals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, outs.get(6));
|
assertEquals(DFS_IMAGE_PARALLEL_LOAD_KEY, outs.get(5));
|
||||||
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(7));
|
assertEquals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, outs.get(6));
|
||||||
assertEquals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY, outs.get(8));
|
assertEquals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, outs.get(7));
|
||||||
|
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(8));
|
||||||
|
assertEquals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY, outs.get(9));
|
||||||
assertEquals(errs.size(), 0);
|
assertEquals(errs.size(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user