HDFS-16888. BlockManager#maxReplicationStreams, replicationStreamsHardLimit, blocksReplWorkMultiplier and PendingReconstructionBlocks#timeout should be volatile (#5296)
Reviewed-by: Tao Li <tomscut@apache.org> Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
This commit is contained in:
parent
9d47108b50
commit
88c8ac750d
@ -395,12 +395,12 @@ public int getPendingSPSPaths() {
|
||||
* The maximum number of outgoing replication streams a given node should have
|
||||
* at one time considering all but the highest priority replications needed.
|
||||
*/
|
||||
int maxReplicationStreams;
|
||||
private volatile int maxReplicationStreams;
|
||||
/**
|
||||
* The maximum number of outgoing replication streams a given node should have
|
||||
* at one time.
|
||||
*/
|
||||
int replicationStreamsHardLimit;
|
||||
private volatile int replicationStreamsHardLimit;
|
||||
/** Minimum copies needed or else write is disallowed */
|
||||
public final short minReplication;
|
||||
/** Default number of replicas */
|
||||
@ -409,7 +409,7 @@ public int getPendingSPSPaths() {
|
||||
final int maxCorruptFilesReturned;
|
||||
|
||||
final float blocksInvalidateWorkPct;
|
||||
private int blocksReplWorkMultiplier;
|
||||
private volatile int blocksReplWorkMultiplier;
|
||||
|
||||
// whether or not to issue block encryption keys.
|
||||
final boolean encryptDataTransfer;
|
||||
@ -1017,12 +1017,19 @@ static private void ensurePositiveInt(int val, String key) {
|
||||
*
|
||||
* @param newVal - Must be a positive non-zero integer.
|
||||
*/
|
||||
public void setMaxReplicationStreams(int newVal) {
|
||||
ensurePositiveInt(newVal,
|
||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY);
|
||||
@VisibleForTesting
|
||||
public void setMaxReplicationStreams(int newVal, boolean ensurePositiveInt) {
|
||||
if (ensurePositiveInt) {
|
||||
ensurePositiveInt(newVal,
|
||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY);
|
||||
}
|
||||
maxReplicationStreams = newVal;
|
||||
}
|
||||
|
||||
public void setMaxReplicationStreams(int newVal) {
|
||||
setMaxReplicationStreams(newVal, true);
|
||||
}
|
||||
|
||||
/** Returns the current setting for maxReplicationStreamsHardLimit, set by
|
||||
* {@code DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY}.
|
||||
*
|
||||
|
@ -59,7 +59,7 @@ class PendingReconstructionBlocks {
|
||||
// It might take anywhere between 5 to 10 minutes before
|
||||
// a request is timed out.
|
||||
//
|
||||
private long timeout =
|
||||
private volatile long timeout =
|
||||
DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT * 1000;
|
||||
private final static long DEFAULT_RECHECK_INTERVAL = 5 * 60 * 1000;
|
||||
|
||||
|
@ -44,7 +44,7 @@ public class BlockManagerTestUtil {
|
||||
|
||||
public static void setNodeReplicationLimit(final BlockManager blockManager,
|
||||
final int limit) {
|
||||
blockManager.maxReplicationStreams = limit;
|
||||
blockManager.setMaxReplicationStreams(limit, false);
|
||||
}
|
||||
|
||||
/** @return the datanode descriptor for the given the given storageID. */
|
||||
|
@ -677,8 +677,8 @@ private LinkedListMultimap<DatanodeStorageInfo, BlockTargetPair> getAllPendingRe
|
||||
*/
|
||||
@Test
|
||||
public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception {
|
||||
bm.maxReplicationStreams = 0;
|
||||
bm.replicationStreamsHardLimit = 1;
|
||||
bm.setMaxReplicationStreams(0, false);
|
||||
bm.setReplicationStreamsHardLimit(1);
|
||||
|
||||
long blockId = 42; // arbitrary
|
||||
Block aBlock = new Block(blockId, 0, 0);
|
||||
@ -735,7 +735,7 @@ public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception {
|
||||
|
||||
@Test
|
||||
public void testChooseSrcDatanodesWithDupEC() throws Exception {
|
||||
bm.maxReplicationStreams = 4;
|
||||
bm.setMaxReplicationStreams(4, false);
|
||||
|
||||
long blockId = -9223372036854775776L; // real ec block id
|
||||
Block aBlock = new Block(blockId, 0, 0);
|
||||
@ -895,7 +895,7 @@ public void testSkipReconstructionWithManyBusyNodes() {
|
||||
assertNotNull(work);
|
||||
|
||||
// simulate the 2 nodes reach maxReplicationStreams
|
||||
for(int i = 0; i < bm.maxReplicationStreams; i++){
|
||||
for(int i = 0; i < bm.getMaxReplicationStreams(); i++){
|
||||
ds3.getDatanodeDescriptor().incrementPendingReplicationWithoutTargets();
|
||||
ds4.getDatanodeDescriptor().incrementPendingReplicationWithoutTargets();
|
||||
}
|
||||
@ -939,7 +939,7 @@ public void testSkipReconstructionWithManyBusyNodes2() {
|
||||
assertNotNull(work);
|
||||
|
||||
// simulate the 1 node reaches maxReplicationStreams
|
||||
for(int i = 0; i < bm.maxReplicationStreams; i++){
|
||||
for(int i = 0; i < bm.getMaxReplicationStreams(); i++){
|
||||
ds2.getDatanodeDescriptor().incrementPendingReplicationWithoutTargets();
|
||||
}
|
||||
|
||||
@ -948,7 +948,7 @@ public void testSkipReconstructionWithManyBusyNodes2() {
|
||||
assertNotNull(work);
|
||||
|
||||
// simulate the 1 more node reaches maxReplicationStreams
|
||||
for(int i = 0; i < bm.maxReplicationStreams; i++){
|
||||
for(int i = 0; i < bm.getMaxReplicationStreams(); i++){
|
||||
ds3.getDatanodeDescriptor().incrementPendingReplicationWithoutTargets();
|
||||
}
|
||||
|
||||
@ -997,7 +997,7 @@ public void testSkipReconstructionWithManyBusyNodes3() {
|
||||
DatanodeDescriptor[] dummyDDArray = new DatanodeDescriptor[]{dummyDD};
|
||||
DatanodeStorageInfo[] dummyDSArray = new DatanodeStorageInfo[]{ds1};
|
||||
// Simulate the 2 nodes reach maxReplicationStreams.
|
||||
for(int i = 0; i < bm.maxReplicationStreams; i++){ //Add some dummy EC reconstruction task.
|
||||
for(int i = 0; i < bm.getMaxReplicationStreams(); i++){ //Add some dummy EC reconstruction task.
|
||||
ds3.getDatanodeDescriptor().addBlockToBeErasureCoded(dummyBlock, dummyDDArray,
|
||||
dummyDSArray, new byte[0], new byte[0], ecPolicy);
|
||||
ds4.getDatanodeDescriptor().addBlockToBeErasureCoded(dummyBlock, dummyDDArray,
|
||||
@ -1011,8 +1011,8 @@ public void testSkipReconstructionWithManyBusyNodes3() {
|
||||
|
||||
@Test
|
||||
public void testFavorDecomUntilHardLimit() throws Exception {
|
||||
bm.maxReplicationStreams = 0;
|
||||
bm.replicationStreamsHardLimit = 1;
|
||||
bm.setMaxReplicationStreams(0, false);
|
||||
bm.setReplicationStreamsHardLimit(1);
|
||||
|
||||
long blockId = 42; // arbitrary
|
||||
Block aBlock = new Block(blockId, 0, 0);
|
||||
|
@ -159,9 +159,9 @@ public void testNumberOfBlocksToBeReplicated() throws Exception {
|
||||
|
||||
BlockManagerTestUtil.updateState(bm);
|
||||
assertTrue("The number of blocks to be replicated should be less than "
|
||||
+ "or equal to " + bm.replicationStreamsHardLimit,
|
||||
+ "or equal to " + bm.getReplicationStreamsHardLimit(),
|
||||
secondDn.getNumberOfBlocksToBeReplicated()
|
||||
<= bm.replicationStreamsHardLimit);
|
||||
<= bm.getReplicationStreamsHardLimit());
|
||||
DFSTestUtil.verifyClientStats(conf, cluster);
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
|
Loading…
Reference in New Issue
Block a user