HDFS-16314. Support to make dfs.namenode.block-placement-policy.exclude-slow-nodes.enabled reconfigurable (#3664)
Reviewed-by: Fei Hui <feihui.ustc@gmail.com> Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
This commit is contained in:
parent
47ea0d734f
commit
c0f405a46b
@ -5360,4 +5360,14 @@ public void disableSPS() {
|
|||||||
public StoragePolicySatisfyManager getSPSManager() {
|
public StoragePolicySatisfyManager getSPSManager() {
|
||||||
return spsManager;
|
return spsManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setExcludeSlowNodesEnabled(boolean enable) {
|
||||||
|
placementPolicies.getPolicy(CONTIGUOUS).setExcludeSlowNodesEnabled(enable);
|
||||||
|
placementPolicies.getPolicy(STRIPED).setExcludeSlowNodesEnabled(enable);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public boolean getExcludeSlowNodesEnabled(BlockType blockType) {
|
||||||
|
return placementPolicies.getPolicy(blockType).getExcludeSlowNodesEnabled();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -261,4 +261,16 @@ public <T> void splitNodesWithRack(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Updates the value used for excludeSlowNodesEnabled, which is set by
|
||||||
|
* {@code DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY}
|
||||||
|
* initially.
|
||||||
|
*
|
||||||
|
* @param enable true, we will filter out slow nodes
|
||||||
|
* when choosing targets for blocks, otherwise false not filter.
|
||||||
|
*/
|
||||||
|
public abstract void setExcludeSlowNodesEnabled(boolean enable);
|
||||||
|
|
||||||
|
public abstract boolean getExcludeSlowNodesEnabled();
|
||||||
}
|
}
|
||||||
|
@ -103,7 +103,7 @@ private String getText() {
|
|||||||
protected double considerLoadFactor;
|
protected double considerLoadFactor;
|
||||||
private boolean preferLocalNode;
|
private boolean preferLocalNode;
|
||||||
private boolean dataNodePeerStatsEnabled;
|
private boolean dataNodePeerStatsEnabled;
|
||||||
private boolean excludeSlowNodesEnabled;
|
private volatile boolean excludeSlowNodesEnabled;
|
||||||
protected NetworkTopology clusterMap;
|
protected NetworkTopology clusterMap;
|
||||||
protected Host2NodesMap host2datanodeMap;
|
protected Host2NodesMap host2datanodeMap;
|
||||||
private FSClusterStats stats;
|
private FSClusterStats stats;
|
||||||
@ -1359,5 +1359,15 @@ protected Collection<DatanodeStorageInfo> pickupReplicaSet(
|
|||||||
void setPreferLocalNode(boolean prefer) {
|
void setPreferLocalNode(boolean prefer) {
|
||||||
this.preferLocalNode = prefer;
|
this.preferLocalNode = prefer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setExcludeSlowNodesEnabled(boolean enable) {
|
||||||
|
this.excludeSlowNodesEnabled = enable;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean getExcludeSlowNodesEnabled() {
|
||||||
|
return excludeSlowNodesEnabled;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -189,6 +189,8 @@
|
|||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT;
|
||||||
|
|
||||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||||
import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
|
import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
|
||||||
@ -331,7 +333,8 @@ public enum OperationCategory {
|
|||||||
DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
|
DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
|
||||||
DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY,
|
DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY,
|
||||||
DFS_IMAGE_PARALLEL_LOAD_KEY,
|
DFS_IMAGE_PARALLEL_LOAD_KEY,
|
||||||
DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY));
|
DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY,
|
||||||
|
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY));
|
||||||
|
|
||||||
private static final String USAGE = "Usage: hdfs namenode ["
|
private static final String USAGE = "Usage: hdfs namenode ["
|
||||||
+ StartupOption.BACKUP.getName() + "] | \n\t["
|
+ StartupOption.BACKUP.getName() + "] | \n\t["
|
||||||
@ -2200,7 +2203,8 @@ protected String reconfigurePropertyImpl(String property, String newVal)
|
|||||||
return newVal;
|
return newVal;
|
||||||
} else if (property.equals(DFS_IMAGE_PARALLEL_LOAD_KEY)) {
|
} else if (property.equals(DFS_IMAGE_PARALLEL_LOAD_KEY)) {
|
||||||
return reconfigureParallelLoad(newVal);
|
return reconfigureParallelLoad(newVal);
|
||||||
} else if (property.equals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY)) {
|
} else if (property.equals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY)
|
||||||
|
|| (property.equals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY))) {
|
||||||
return reconfigureSlowNodesParameters(datanodeManager, property, newVal);
|
return reconfigureSlowNodesParameters(datanodeManager, property, newVal);
|
||||||
} else {
|
} else {
|
||||||
throw new ReconfigurationException(property, newVal, getConf().get(
|
throw new ReconfigurationException(property, newVal, getConf().get(
|
||||||
@ -2234,7 +2238,7 @@ private String reconfReplicationParameters(final String newVal,
|
|||||||
newSetting = bm.getBlocksReplWorkMultiplier();
|
newSetting = bm.getBlocksReplWorkMultiplier();
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalArgumentException("Unexpected property " +
|
throw new IllegalArgumentException("Unexpected property " +
|
||||||
property + "in reconfReplicationParameters");
|
property + " in reconfReplicationParameters");
|
||||||
}
|
}
|
||||||
LOG.info("RECONFIGURE* changed {} to {}", property, newSetting);
|
LOG.info("RECONFIGURE* changed {} to {}", property, newSetting);
|
||||||
return String.valueOf(newSetting);
|
return String.valueOf(newSetting);
|
||||||
@ -2390,15 +2394,24 @@ String reconfigureParallelLoad(String newVal) {
|
|||||||
|
|
||||||
String reconfigureSlowNodesParameters(final DatanodeManager datanodeManager,
|
String reconfigureSlowNodesParameters(final DatanodeManager datanodeManager,
|
||||||
final String property, final String newVal) throws ReconfigurationException {
|
final String property, final String newVal) throws ReconfigurationException {
|
||||||
|
BlockManager bm = namesystem.getBlockManager();
|
||||||
namesystem.writeLock();
|
namesystem.writeLock();
|
||||||
boolean enable;
|
boolean enable;
|
||||||
try {
|
try {
|
||||||
if (newVal == null) {
|
if (property.equals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY)) {
|
||||||
enable = DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT;
|
enable = (newVal == null ? DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_DEFAULT :
|
||||||
|
Boolean.parseBoolean(newVal));
|
||||||
|
datanodeManager.setAvoidSlowDataNodesForReadEnabled(enable);
|
||||||
|
} else if (property.equals(
|
||||||
|
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY)) {
|
||||||
|
enable = (newVal == null ?
|
||||||
|
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT :
|
||||||
|
Boolean.parseBoolean(newVal));
|
||||||
|
bm.setExcludeSlowNodesEnabled(enable);
|
||||||
} else {
|
} else {
|
||||||
enable = Boolean.parseBoolean(newVal);
|
throw new IllegalArgumentException("Unexpected property " +
|
||||||
|
property + " in reconfigureSlowNodesParameters");
|
||||||
}
|
}
|
||||||
datanodeManager.setAvoidSlowDataNodesForReadEnabled(enable);
|
|
||||||
LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
|
LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
|
||||||
return Boolean.toString(enable);
|
return Boolean.toString(enable);
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
|
@ -34,10 +34,12 @@
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.BlockType;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
|
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
@ -52,6 +54,7 @@
|
|||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT;
|
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT;
|
||||||
|
|
||||||
public class TestNameNodeReconfigure {
|
public class TestNameNodeReconfigure {
|
||||||
@ -399,8 +402,8 @@ public void testEnableParallelLoadAfterReconfigured()
|
|||||||
public void testEnableSlowNodesParametersAfterReconfigured()
|
public void testEnableSlowNodesParametersAfterReconfigured()
|
||||||
throws ReconfigurationException {
|
throws ReconfigurationException {
|
||||||
final NameNode nameNode = cluster.getNameNode();
|
final NameNode nameNode = cluster.getNameNode();
|
||||||
final DatanodeManager datanodeManager = nameNode.namesystem
|
final BlockManager blockManager = nameNode.namesystem.getBlockManager();
|
||||||
.getBlockManager().getDatanodeManager();
|
final DatanodeManager datanodeManager = blockManager.getDatanodeManager();
|
||||||
|
|
||||||
// By default, avoidSlowDataNodesForRead is false.
|
// By default, avoidSlowDataNodesForRead is false.
|
||||||
assertEquals(false, datanodeManager.getEnableAvoidSlowDataNodesForRead());
|
assertEquals(false, datanodeManager.getEnableAvoidSlowDataNodesForRead());
|
||||||
@ -410,6 +413,21 @@ public void testEnableSlowNodesParametersAfterReconfigured()
|
|||||||
|
|
||||||
// After reconfigured, avoidSlowDataNodesForRead is true.
|
// After reconfigured, avoidSlowDataNodesForRead is true.
|
||||||
assertEquals(true, datanodeManager.getEnableAvoidSlowDataNodesForRead());
|
assertEquals(true, datanodeManager.getEnableAvoidSlowDataNodesForRead());
|
||||||
|
|
||||||
|
// By default, excludeSlowNodesEnabled is false.
|
||||||
|
assertEquals(false, blockManager.
|
||||||
|
getExcludeSlowNodesEnabled(BlockType.CONTIGUOUS));
|
||||||
|
assertEquals(false, blockManager.
|
||||||
|
getExcludeSlowNodesEnabled(BlockType.STRIPED));
|
||||||
|
|
||||||
|
nameNode.reconfigureProperty(
|
||||||
|
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, Boolean.toString(true));
|
||||||
|
|
||||||
|
// After reconfigured, excludeSlowNodesEnabled is true.
|
||||||
|
assertEquals(true, blockManager.
|
||||||
|
getExcludeSlowNodesEnabled(BlockType.CONTIGUOUS));
|
||||||
|
assertEquals(true, blockManager.
|
||||||
|
getExcludeSlowNodesEnabled(BlockType.STRIPED));
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -39,6 +39,7 @@
|
|||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.text.TextStringBuilder;
|
import org.apache.commons.text.TextStringBuilder;
|
||||||
@ -431,13 +432,14 @@ public void testNameNodeGetReconfigurableProperties() throws IOException {
|
|||||||
final List<String> outs = Lists.newArrayList();
|
final List<String> outs = Lists.newArrayList();
|
||||||
final List<String> errs = Lists.newArrayList();
|
final List<String> errs = Lists.newArrayList();
|
||||||
getReconfigurableProperties("namenode", address, outs, errs);
|
getReconfigurableProperties("namenode", address, outs, errs);
|
||||||
assertEquals(14, outs.size());
|
assertEquals(15, outs.size());
|
||||||
assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(1));
|
assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(1));
|
||||||
assertEquals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, outs.get(2));
|
assertEquals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, outs.get(2));
|
||||||
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(3));
|
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(3));
|
||||||
assertEquals(DFS_IMAGE_PARALLEL_LOAD_KEY, outs.get(4));
|
assertEquals(DFS_IMAGE_PARALLEL_LOAD_KEY, outs.get(4));
|
||||||
assertEquals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, outs.get(5));
|
assertEquals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, outs.get(5));
|
||||||
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(6));
|
assertEquals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, outs.get(6));
|
||||||
|
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(7));
|
||||||
assertEquals(errs.size(), 0);
|
assertEquals(errs.size(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user