HDFS-17205. HdfsServerConstants.MIN_BLOCKS_FOR_WRITE should be configurable (#6112). Contributed by Haiyang Hu
Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org> Signed-off-by: Ayush Saxena <ayushsaxena@apache.org>
This commit is contained in:
parent
57100bba1b
commit
4c408a557f
@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.ReservedSpaceCalculator;
|
||||
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
||||
@ -1270,6 +1271,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT =
|
||||
false;
|
||||
|
||||
public static final String DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY =
|
||||
"dfs.namenode.block-placement.min-blocks-for.write";
|
||||
public static final int DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_DEFAULT =
|
||||
HdfsServerConstants.MIN_BLOCKS_FOR_WRITE;
|
||||
|
||||
public static final String DFS_NAMENODE_GC_TIME_MONITOR_ENABLE =
|
||||
"dfs.namenode.gc.time.monitor.enable";
|
||||
public static final boolean DFS_NAMENODE_GC_TIME_MONITOR_ENABLE_DEFAULT =
|
||||
|
@ -5689,4 +5689,16 @@ public class BlockManager implements BlockStatsMXBean {
|
||||
public boolean getExcludeSlowNodesEnabled(BlockType blockType) {
|
||||
return placementPolicies.getPolicy(blockType).getExcludeSlowNodesEnabled();
|
||||
}
|
||||
|
||||
public void setMinBlocksForWrite(int minBlocksForWrite) {
|
||||
ensurePositiveInt(minBlocksForWrite,
|
||||
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY);
|
||||
placementPolicies.getPolicy(CONTIGUOUS).setMinBlocksForWrite(minBlocksForWrite);
|
||||
placementPolicies.getPolicy(STRIPED).setMinBlocksForWrite(minBlocksForWrite);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public int getMinBlocksForWrite(BlockType blockType) {
|
||||
return placementPolicies.getPolicy(blockType).getMinBlocksForWrite();
|
||||
}
|
||||
}
|
||||
|
@ -274,4 +274,14 @@ public abstract class BlockPlacementPolicy {
|
||||
public abstract void setExcludeSlowNodesEnabled(boolean enable);
|
||||
|
||||
public abstract boolean getExcludeSlowNodesEnabled();
|
||||
|
||||
/**
|
||||
* Updates the value used for minBlocksForWrite, which is set by
|
||||
* {@code DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY}.
|
||||
*
|
||||
* @param minBlocksForWrite the minimum number of blocks required for write operations.
|
||||
*/
|
||||
public abstract void setMinBlocksForWrite(int minBlocksForWrite);
|
||||
|
||||
public abstract int getMinBlocksForWrite();
|
||||
}
|
||||
|
@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY;
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
@ -111,7 +113,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||
private FSClusterStats stats;
|
||||
protected long heartbeatInterval; // interval for DataNode heartbeats
|
||||
private long staleInterval; // interval used to identify stale DataNodes
|
||||
|
||||
private volatile int minBlocksForWrite; // minimum number of blocks required for write operations.
|
||||
|
||||
/**
|
||||
* A miss of that many heartbeats is tolerated for replica deletion policy.
|
||||
*/
|
||||
@ -161,6 +164,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||
this.excludeSlowNodesEnabled = conf.getBoolean(
|
||||
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY,
|
||||
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_DEFAULT);
|
||||
this.minBlocksForWrite = conf.getInt(
|
||||
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY,
|
||||
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_DEFAULT);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -959,7 +965,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||
List<DatanodeStorageInfo> results,
|
||||
StorageType storageType) {
|
||||
DatanodeStorageInfo storage =
|
||||
dnd.chooseStorage4Block(storageType, blockSize);
|
||||
dnd.chooseStorage4Block(storageType, blockSize, minBlocksForWrite);
|
||||
if (storage != null) {
|
||||
results.add(storage);
|
||||
} else {
|
||||
@ -1386,4 +1392,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
||||
public boolean getExcludeSlowNodesEnabled() {
|
||||
return excludeSlowNodesEnabled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMinBlocksForWrite(int minBlocksForWrite) {
|
||||
this.minBlocksForWrite = minBlocksForWrite;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMinBlocksForWrite() {
|
||||
return minBlocksForWrite;
|
||||
}
|
||||
}
|
||||
|
@ -40,7 +40,6 @@ import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
@ -812,11 +811,11 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||
*
|
||||
* @param t requested storage type
|
||||
* @param blockSize requested block size
|
||||
* @param minBlocksForWrite requested the minimum number of blocks
|
||||
*/
|
||||
public DatanodeStorageInfo chooseStorage4Block(StorageType t,
|
||||
long blockSize) {
|
||||
final long requiredSize =
|
||||
blockSize * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE;
|
||||
long blockSize, int minBlocksForWrite) {
|
||||
final long requiredSize = blockSize * minBlocksForWrite;
|
||||
final long scheduledSize = blockSize * getBlocksScheduled(t);
|
||||
long remaining = 0;
|
||||
DatanodeStorageInfo storage = null;
|
||||
|
@ -42,6 +42,8 @@ import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public interface HdfsServerConstants {
|
||||
// Will be set by
|
||||
// {@code DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY}.
|
||||
int MIN_BLOCKS_FOR_WRITE = 1;
|
||||
|
||||
long LEASE_RECOVER_PERIOD = 10 * 1000; // in ms
|
||||
|
@ -135,6 +135,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NN_NOT_BECOME_ACTIVE_I
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT;
|
||||
@ -362,7 +364,8 @@ public class NameNode extends ReconfigurableBase implements
|
||||
DFS_DATANODE_MAX_NODES_TO_REPORT_KEY,
|
||||
DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
|
||||
DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT,
|
||||
DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK));
|
||||
DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK,
|
||||
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY));
|
||||
|
||||
private static final String USAGE = "Usage: hdfs namenode ["
|
||||
+ StartupOption.BACKUP.getName() + "] | \n\t["
|
||||
@ -2362,6 +2365,8 @@ public class NameNode extends ReconfigurableBase implements
|
||||
(property.equals(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK))) {
|
||||
return reconfigureDecommissionBackoffMonitorParameters(datanodeManager, property,
|
||||
newVal);
|
||||
} else if (property.equals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY)) {
|
||||
return reconfigureMinBlocksForWrite(property, newVal);
|
||||
} else {
|
||||
throw new ReconfigurationException(property, newVal, getConf().get(
|
||||
property));
|
||||
@ -2671,6 +2676,18 @@ public class NameNode extends ReconfigurableBase implements
|
||||
}
|
||||
}
|
||||
|
||||
private String reconfigureMinBlocksForWrite(String property, String newValue)
|
||||
throws ReconfigurationException {
|
||||
try {
|
||||
int newSetting = adjustNewVal(
|
||||
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_DEFAULT, newValue);
|
||||
namesystem.getBlockManager().setMinBlocksForWrite(newSetting);
|
||||
return String.valueOf(newSetting);
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new ReconfigurationException(property, newValue, getConf().get(property), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override // ReconfigurableBase
|
||||
protected Configuration getNewConf() {
|
||||
return new HdfsConfiguration();
|
||||
|
@ -2445,6 +2445,15 @@
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.block-placement.min-blocks-for.write</name>
|
||||
<value>1</value>
|
||||
<description>
|
||||
Setting the minimum number of blocks for write operations is used to calculate the space required
|
||||
for write operations.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.max.slowpeer.collect.nodes</name>
|
||||
<value>5</value>
|
||||
|
@ -1734,4 +1734,27 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
||||
assertFalse(dnManager.shouldAvoidStaleDataNodesForWrite());
|
||||
resetHeartbeatForStorages();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChosenFailureForNotEnoughStorageSpace() {
|
||||
final LogVerificationAppender appender = new LogVerificationAppender();
|
||||
final Logger logger = Logger.getRootLogger();
|
||||
logger.addAppender(appender);
|
||||
|
||||
// Set all datanode storage remaining space is 1 * BLOCK_SIZE.
|
||||
for(int i = 0; i < dataNodes.length; i++) {
|
||||
updateHeartbeatWithUsage(dataNodes[i], BLOCK_SIZE, 0L, BLOCK_SIZE,
|
||||
0L, 0L, 0L, 0, 0);
|
||||
}
|
||||
|
||||
// Set chooseStorage4Block required the minimum number of blocks is 2.
|
||||
replicator.setMinBlocksForWrite(2);
|
||||
DatanodeStorageInfo[] targets = chooseTarget(1, dataNodes[1],
|
||||
new ArrayList<DatanodeStorageInfo>(), null);
|
||||
assertEquals(0, targets.length);
|
||||
assertNotEquals(0,
|
||||
appender.countLinesWithMessage("NOT_ENOUGH_STORAGE_SPACE"));
|
||||
|
||||
resetHeartbeatForStorages();
|
||||
}
|
||||
}
|
@ -24,6 +24,7 @@ import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminBackoffMonitor;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminMonitorInterface;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
import org.junit.Test;
|
||||
import org.junit.Before;
|
||||
import org.junit.After;
|
||||
@ -654,6 +655,52 @@ public class TestNameNodeReconfigure {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReconfigureMinBlocksForWrite() throws Exception {
|
||||
final NameNode nameNode = cluster.getNameNode(0);
|
||||
final BlockManager bm = nameNode.getNamesystem().getBlockManager();
|
||||
String key = DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY;
|
||||
int defaultVal = DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_DEFAULT;
|
||||
|
||||
// Ensure we cannot set any of the parameters negative
|
||||
ReconfigurationException reconfigurationException =
|
||||
LambdaTestUtils.intercept(ReconfigurationException.class,
|
||||
() -> nameNode.reconfigurePropertyImpl(key, "-20"));
|
||||
assertTrue(reconfigurationException.getCause() instanceof IllegalArgumentException);
|
||||
assertEquals(key + " = '-20' is invalid. It should be a "
|
||||
+"positive, non-zero integer value.", reconfigurationException.getCause().getMessage());
|
||||
|
||||
// Ensure none of the values were updated from the defaults
|
||||
assertEquals(defaultVal, bm.getMinBlocksForWrite(BlockType.CONTIGUOUS));
|
||||
assertEquals(defaultVal, bm.getMinBlocksForWrite(BlockType.STRIPED));
|
||||
|
||||
reconfigurationException = LambdaTestUtils.intercept(ReconfigurationException.class,
|
||||
() -> nameNode.reconfigurePropertyImpl(key, "0"));
|
||||
assertTrue(reconfigurationException.getCause() instanceof IllegalArgumentException);
|
||||
assertEquals(key + " = '0' is invalid. It should be a "
|
||||
+"positive, non-zero integer value.", reconfigurationException.getCause().getMessage());
|
||||
|
||||
// Ensure none of the values were updated from the defaults
|
||||
assertEquals(defaultVal, bm.getMinBlocksForWrite(BlockType.CONTIGUOUS));
|
||||
assertEquals(defaultVal, bm.getMinBlocksForWrite(BlockType.STRIPED));
|
||||
|
||||
|
||||
// Ensure none of the parameters can be set to a string value
|
||||
reconfigurationException = LambdaTestUtils.intercept(ReconfigurationException.class,
|
||||
() -> nameNode.reconfigurePropertyImpl(key, "str"));
|
||||
assertTrue(reconfigurationException.getCause() instanceof NumberFormatException);
|
||||
|
||||
// Ensure none of the values were updated from the defaults
|
||||
assertEquals(defaultVal, bm.getMinBlocksForWrite(BlockType.CONTIGUOUS));
|
||||
assertEquals(defaultVal, bm.getMinBlocksForWrite(BlockType.STRIPED));
|
||||
|
||||
nameNode.reconfigurePropertyImpl(key, "3");
|
||||
|
||||
// Ensure none of the values were updated from the new value.
|
||||
assertEquals(3, bm.getMinBlocksForWrite(BlockType.CONTIGUOUS));
|
||||
assertEquals(3, bm.getMinBlocksForWrite(BlockType.STRIPED));
|
||||
}
|
||||
|
||||
@After
|
||||
public void shutDown() throws IOException {
|
||||
if (cluster != null) {
|
||||
|
@ -38,6 +38,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABL
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY;
|
||||
@ -441,7 +442,7 @@ public class TestDFSAdmin {
|
||||
final List<String> outs = Lists.newArrayList();
|
||||
final List<String> errs = Lists.newArrayList();
|
||||
getReconfigurableProperties("namenode", address, outs, errs);
|
||||
assertEquals(22, outs.size());
|
||||
assertEquals(23, outs.size());
|
||||
assertTrue(outs.get(0).contains("Reconfigurable properties:"));
|
||||
assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY, outs.get(1));
|
||||
assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(2));
|
||||
@ -452,10 +453,11 @@ public class TestDFSAdmin {
|
||||
assertEquals(DFS_IMAGE_PARALLEL_LOAD_KEY, outs.get(7));
|
||||
assertEquals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, outs.get(8));
|
||||
assertEquals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, outs.get(9));
|
||||
assertEquals(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK, outs.get(10));
|
||||
assertEquals(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT, outs.get(11));
|
||||
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(12));
|
||||
assertEquals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY, outs.get(13));
|
||||
assertEquals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY, outs.get(10));
|
||||
assertEquals(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK, outs.get(11));
|
||||
assertEquals(DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT, outs.get(12));
|
||||
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(13));
|
||||
assertEquals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY, outs.get(14));
|
||||
assertEquals(errs.size(), 0);
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user