diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index a70c7b9c00..a6b60cf8bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -386,12 +386,12 @@ public long getTotalECBlockGroups() { * The maximum number of outgoing replication streams a given node should have * at one time considering all but the highest priority replications needed. */ - int maxReplicationStreams; + private volatile int maxReplicationStreams; /** * The maximum number of outgoing replication streams a given node should have * at one time. */ - int replicationStreamsHardLimit; + private volatile int replicationStreamsHardLimit; /** Minimum copies needed or else write is disallowed */ public final short minReplication; /** Default number of replicas */ @@ -400,7 +400,7 @@ public long getTotalECBlockGroups() { final int maxCorruptFilesReturned; final float blocksInvalidateWorkPct; - private int blocksReplWorkMultiplier; + private volatile int blocksReplWorkMultiplier; // whether or not to issue block encryption keys. final boolean encryptDataTransfer; @@ -974,12 +974,19 @@ static private void ensurePositiveInt(int val, String key) { * * @param newVal - Must be a positive non-zero integer. */ - public void setMaxReplicationStreams(int newVal) { - ensurePositiveInt(newVal, - DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY); + @VisibleForTesting + public void setMaxReplicationStreams(int newVal, boolean ensurePositiveInt) { + if (ensurePositiveInt) { + ensurePositiveInt(newVal, + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY); + } maxReplicationStreams = newVal; } + public void setMaxReplicationStreams(int newVal) { + setMaxReplicationStreams(newVal, true); + } + /** Returns the current setting for maxReplicationStreamsHardLimit, set by * {@code DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY}. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java index 81495ebaf2..322d2696e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReconstructionBlocks.java @@ -59,7 +59,7 @@ class PendingReconstructionBlocks { // It might take anywhere between 5 to 10 minutes before // a request is timed out. // - private long timeout = + private volatile long timeout = DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT * 1000; private final static long DEFAULT_RECHECK_INTERVAL = 5 * 60 * 1000; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java index ed0548443c..ef0c0a6e9d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java @@ -44,7 +44,7 @@ public class BlockManagerTestUtil { public static void setNodeReplicationLimit(final BlockManager blockManager, final int limit) { - blockManager.maxReplicationStreams = limit; + blockManager.setMaxReplicationStreams(limit, false); } /** @return the datanode descriptor for the given the given storageID. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index cfa707f4b0..72792e3173 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -677,8 +677,8 @@ private LinkedListMultimap getAllPendingRe */ @Test public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception { - bm.maxReplicationStreams = 0; - bm.replicationStreamsHardLimit = 1; + bm.setMaxReplicationStreams(0, false); + bm.setReplicationStreamsHardLimit(1); long blockId = 42; // arbitrary Block aBlock = new Block(blockId, 0, 0); @@ -735,7 +735,7 @@ public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception { @Test public void testChooseSrcDatanodesWithDupEC() throws Exception { - bm.maxReplicationStreams = 4; + bm.setMaxReplicationStreams(4, false); long blockId = -9223372036854775776L; // real ec block id Block aBlock = new Block(blockId, 0, 0); @@ -895,7 +895,7 @@ public void testSkipReconstructionWithManyBusyNodes() { assertNotNull(work); // simulate the 2 nodes reach maxReplicationStreams - for(int i = 0; i < bm.maxReplicationStreams; i++){ + for(int i = 0; i < bm.getMaxReplicationStreams(); i++){ ds3.getDatanodeDescriptor().incrementPendingReplicationWithoutTargets(); ds4.getDatanodeDescriptor().incrementPendingReplicationWithoutTargets(); } @@ -939,7 +939,7 @@ public void testSkipReconstructionWithManyBusyNodes2() { assertNotNull(work); // simulate the 1 node reaches maxReplicationStreams - for(int i = 0; i < bm.maxReplicationStreams; i++){ + for(int i = 0; i < bm.getMaxReplicationStreams(); i++){ ds2.getDatanodeDescriptor().incrementPendingReplicationWithoutTargets(); } @@ -948,7 +948,7 @@ public void testSkipReconstructionWithManyBusyNodes2() { assertNotNull(work); // simulate the 1 more node reaches maxReplicationStreams - for(int i = 0; i < bm.maxReplicationStreams; i++){ + for(int i = 0; i < bm.getMaxReplicationStreams(); i++){ ds3.getDatanodeDescriptor().incrementPendingReplicationWithoutTargets(); } @@ -997,7 +997,7 @@ public void testSkipReconstructionWithManyBusyNodes3() { DatanodeDescriptor[] dummyDDArray = new DatanodeDescriptor[]{dummyDD}; DatanodeStorageInfo[] dummyDSArray = new DatanodeStorageInfo[]{ds1}; // Simulate the 2 nodes reach maxReplicationStreams. - for(int i = 0; i < bm.maxReplicationStreams; i++){ //Add some dummy EC reconstruction task. + for(int i = 0; i < bm.getMaxReplicationStreams(); i++){ //Add some dummy EC reconstruction task. ds3.getDatanodeDescriptor().addBlockToBeErasureCoded(dummyBlock, dummyDDArray, dummyDSArray, new byte[0], new byte[0], ecPolicy); ds4.getDatanodeDescriptor().addBlockToBeErasureCoded(dummyBlock, dummyDDArray, @@ -1011,8 +1011,8 @@ public void testSkipReconstructionWithManyBusyNodes3() { @Test public void testFavorDecomUntilHardLimit() throws Exception { - bm.maxReplicationStreams = 0; - bm.replicationStreamsHardLimit = 1; + bm.setMaxReplicationStreams(0, false); + bm.setReplicationStreamsHardLimit(1); long blockId = 42; // arbitrary Block aBlock = new Block(blockId, 0, 0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java index 0487c3f973..04d2572b39 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java @@ -159,9 +159,9 @@ public void testNumberOfBlocksToBeReplicated() throws Exception { BlockManagerTestUtil.updateState(bm); assertTrue("The number of blocks to be replicated should be less than " - + "or equal to " + bm.replicationStreamsHardLimit, + + "or equal to " + bm.getReplicationStreamsHardLimit(), secondDn.getNumberOfBlocksToBeReplicated() - <= bm.replicationStreamsHardLimit); + <= bm.getReplicationStreamsHardLimit()); DFSTestUtil.verifyClientStats(conf, cluster); } finally { cluster.shutdown();