HDFS-16888. BlockManager#maxReplicationStreams, replicationStreamsHardLimit, blocksReplWorkMultiplier and PendingReconstructionBlocks#timeout should be volatile (#5296)

Reviewed-by: Tao Li <tomscut@apache.org>
Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
(cherry picked from commit 88c8ac750d)
This commit is contained in:
huhaiyang 2023-01-31 16:46:38 +08:00 committed by Takanobu Asanuma
parent 73f3196db5
commit 80b42625cf
5 changed files with 26 additions and 19 deletions

View File

@ -386,12 +386,12 @@ public long getTotalECBlockGroups() {
* The maximum number of outgoing replication streams a given node should have
* at one time considering all but the highest priority replications needed.
*/
int maxReplicationStreams;
private volatile int maxReplicationStreams;
/**
* The maximum number of outgoing replication streams a given node should have
* at one time.
*/
int replicationStreamsHardLimit;
private volatile int replicationStreamsHardLimit;
/** Minimum copies needed or else write is disallowed */
public final short minReplication;
/** Default number of replicas */
@ -400,7 +400,7 @@ public long getTotalECBlockGroups() {
final int maxCorruptFilesReturned;
final float blocksInvalidateWorkPct;
private int blocksReplWorkMultiplier;
private volatile int blocksReplWorkMultiplier;
// whether or not to issue block encryption keys.
final boolean encryptDataTransfer;
@ -974,12 +974,19 @@ static private void ensurePositiveInt(int val, String key) {
*
* @param newVal - Must be a positive non-zero integer.
*/
public void setMaxReplicationStreams(int newVal) {
ensurePositiveInt(newVal,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY);
@VisibleForTesting
public void setMaxReplicationStreams(int newVal, boolean ensurePositiveInt) {
if (ensurePositiveInt) {
ensurePositiveInt(newVal,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY);
}
maxReplicationStreams = newVal;
}
public void setMaxReplicationStreams(int newVal) {
setMaxReplicationStreams(newVal, true);
}
/** Returns the current setting for maxReplicationStreamsHardLimit, set by
* {@code DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY}.
*

View File

@ -59,7 +59,7 @@ class PendingReconstructionBlocks {
// It might take anywhere between 5 to 10 minutes before
// a request is timed out.
//
private long timeout =
private volatile long timeout =
DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT * 1000;
private final static long DEFAULT_RECHECK_INTERVAL = 5 * 60 * 1000;

View File

@ -44,7 +44,7 @@ public class BlockManagerTestUtil {
public static void setNodeReplicationLimit(final BlockManager blockManager,
final int limit) {
blockManager.maxReplicationStreams = limit;
blockManager.setMaxReplicationStreams(limit, false);
}
/** @return the datanode descriptor for the given the given storageID. */

View File

@ -677,8 +677,8 @@ private LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> getAllPendingRe
*/
@Test
public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception {
bm.maxReplicationStreams = 0;
bm.replicationStreamsHardLimit = 1;
bm.setMaxReplicationStreams(0, false);
bm.setReplicationStreamsHardLimit(1);
long blockId = 42; // arbitrary
Block aBlock = new Block(blockId, 0, 0);
@ -735,7 +735,7 @@ public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception {
@Test
public void testChooseSrcDatanodesWithDupEC() throws Exception {
bm.maxReplicationStreams = 4;
bm.setMaxReplicationStreams(4, false);
long blockId = -9223372036854775776L; // real ec block id
Block aBlock = new Block(blockId, 0, 0);
@ -895,7 +895,7 @@ public void testSkipReconstructionWithManyBusyNodes() {
assertNotNull(work);
// simulate the 2 nodes reach maxReplicationStreams
for(int i = 0; i < bm.maxReplicationStreams; i++){
for(int i = 0; i < bm.getMaxReplicationStreams(); i++){
ds3.getDatanodeDescriptor().incrementPendingReplicationWithoutTargets();
ds4.getDatanodeDescriptor().incrementPendingReplicationWithoutTargets();
}
@ -939,7 +939,7 @@ public void testSkipReconstructionWithManyBusyNodes2() {
assertNotNull(work);
// simulate the 1 node reaches maxReplicationStreams
for(int i = 0; i < bm.maxReplicationStreams; i++){
for(int i = 0; i < bm.getMaxReplicationStreams(); i++){
ds2.getDatanodeDescriptor().incrementPendingReplicationWithoutTargets();
}
@ -948,7 +948,7 @@ public void testSkipReconstructionWithManyBusyNodes2() {
assertNotNull(work);
// simulate the 1 more node reaches maxReplicationStreams
for(int i = 0; i < bm.maxReplicationStreams; i++){
for(int i = 0; i < bm.getMaxReplicationStreams(); i++){
ds3.getDatanodeDescriptor().incrementPendingReplicationWithoutTargets();
}
@ -997,7 +997,7 @@ public void testSkipReconstructionWithManyBusyNodes3() {
DatanodeDescriptor[] dummyDDArray = new DatanodeDescriptor[]{dummyDD};
DatanodeStorageInfo[] dummyDSArray = new DatanodeStorageInfo[]{ds1};
// Simulate the 2 nodes reach maxReplicationStreams.
for(int i = 0; i < bm.maxReplicationStreams; i++){ //Add some dummy EC reconstruction task.
for(int i = 0; i < bm.getMaxReplicationStreams(); i++){ //Add some dummy EC reconstruction task.
ds3.getDatanodeDescriptor().addBlockToBeErasureCoded(dummyBlock, dummyDDArray,
dummyDSArray, new byte[0], new byte[0], ecPolicy);
ds4.getDatanodeDescriptor().addBlockToBeErasureCoded(dummyBlock, dummyDDArray,
@ -1011,8 +1011,8 @@ public void testSkipReconstructionWithManyBusyNodes3() {
@Test
public void testFavorDecomUntilHardLimit() throws Exception {
bm.maxReplicationStreams = 0;
bm.replicationStreamsHardLimit = 1;
bm.setMaxReplicationStreams(0, false);
bm.setReplicationStreamsHardLimit(1);
long blockId = 42; // arbitrary
Block aBlock = new Block(blockId, 0, 0);

View File

@ -159,9 +159,9 @@ public void testNumberOfBlocksToBeReplicated() throws Exception {
BlockManagerTestUtil.updateState(bm);
assertTrue("The number of blocks to be replicated should be less than "
+ "or equal to " + bm.replicationStreamsHardLimit,
+ "or equal to " + bm.getReplicationStreamsHardLimit(),
secondDn.getNumberOfBlocksToBeReplicated()
<= bm.replicationStreamsHardLimit);
<= bm.getReplicationStreamsHardLimit());
DFSTestUtil.verifyClientStats(conf, cluster);
} finally {
cluster.shutdown();