diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java index 0db6c73e27..cdde48ce57 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java @@ -641,10 +641,16 @@ public Object answer(InvocationOnMock invocation) throws Throwable { * conditions. */ public static class SleepAnswer implements Answer { + private final int minSleepTime; private final int maxSleepTime; private static Random r = new Random(); - + public SleepAnswer(int maxSleepTime) { + this(0, maxSleepTime); + } + + public SleepAnswer(int minSleepTime, int maxSleepTime) { + this.minSleepTime = minSleepTime; this.maxSleepTime = maxSleepTime; } @@ -652,7 +658,7 @@ public SleepAnswer(int maxSleepTime) { public Object answer(InvocationOnMock invocation) throws Throwable { boolean interrupted = false; try { - Thread.sleep(r.nextInt(maxSleepTime)); + Thread.sleep(r.nextInt(maxSleepTime) + minSleepTime); } catch (InterruptedException ie) { interrupted = true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 4986027b04..1cdb159d98 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -164,6 +164,8 @@ public class BlockManager implements BlockStatsMXBean { private static final String QUEUE_REASON_FUTURE_GENSTAMP = "generation stamp is in the future"; + private static final long BLOCK_RECOVERY_TIMEOUT_MULTIPLIER = 30; + private final Namesystem namesystem; private final BlockManagerSafeMode bmSafeMode; @@ -353,6 +355,9 @@ public long getTotalECBlockGroups() { @VisibleForTesting final PendingReconstructionBlocks pendingReconstruction; + /** Stores information about block recovery attempts. */ + private final PendingRecoveryBlocks pendingRecoveryBlocks; + /** The maximum number of replicas allowed for a block */ public final short maxReplication; /** @@ -549,6 +554,12 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled, } this.minReplicationToBeInMaintenance = (short)minMaintenanceR; + long heartbeatIntervalSecs = conf.getTimeDuration( + DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, + DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS); + long blockRecoveryTimeout = getBlockRecoveryTimeout(heartbeatIntervalSecs); + pendingRecoveryBlocks = new PendingRecoveryBlocks(blockRecoveryTimeout); + this.blockReportLeaseManager = new BlockReportLeaseManager(conf); bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf); @@ -4736,6 +4747,25 @@ public T runBlockOp(final Callable action) } } + /** + * Notification of a successful block recovery. + * @param block for which the recovery succeeded + */ + public void successfulBlockRecovery(BlockInfo block) { + pendingRecoveryBlocks.remove(block); + } + + /** + * Checks whether a recovery attempt has been made for the given block. + * If so, checks whether that attempt has timed out. + * @param b block for which recovery is being attempted + * @return true if no recovery attempt has been made or + * the previous attempt timed out + */ + public boolean addBlockRecoveryAttempt(BlockInfo b) { + return pendingRecoveryBlocks.add(b); + } + @VisibleForTesting public void flushBlockOps() throws IOException { runBlockOp(new Callable(){ @@ -4863,4 +4893,14 @@ private int setBlockIndices(BlockInfo blk, byte[] blockIndices, int i, } return i; } + + private static long getBlockRecoveryTimeout(long heartbeatIntervalSecs) { + return TimeUnit.SECONDS.toMillis(heartbeatIntervalSecs * + BLOCK_RECOVERY_TIMEOUT_MULTIPLIER); + } + + @VisibleForTesting + public void setBlockRecoveryTimeout(long blockRecoveryTimeout) { + pendingRecoveryBlocks.setRecoveryTimeoutInterval(blockRecoveryTimeout); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java new file mode 100644 index 0000000000..3f5f27c819 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingRecoveryBlocks.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdfs.util.LightWeightHashSet; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; + +import java.util.concurrent.TimeUnit; + +/** + * PendingRecoveryBlocks tracks recovery attempts for each block and their + * timeouts to ensure we do not have multiple recoveries at the same time + * and retry only after the timeout for a recovery has expired. + */ +class PendingRecoveryBlocks { + private static final Logger LOG = BlockManager.LOG; + + /** List of recovery attempts per block and the time they expire. */ + private final LightWeightHashSet recoveryTimeouts = + new LightWeightHashSet<>(); + + /** The timeout for issuing a block recovery again. + * (it should be larger than the time to recover a block) + */ + private long recoveryTimeoutInterval; + + PendingRecoveryBlocks(long timeout) { + this.recoveryTimeoutInterval = timeout; + } + + /** + * Remove recovery attempt for the given block. + * @param block whose recovery attempt to remove. + */ + synchronized void remove(BlockInfo block) { + recoveryTimeouts.remove(new BlockRecoveryAttempt(block)); + } + + /** + * Checks whether a recovery attempt has been made for the given block. + * If so, checks whether that attempt has timed out. + * @param block block for which recovery is being attempted + * @return true if no recovery attempt has been made or + * the previous attempt timed out + */ + synchronized boolean add(BlockInfo block) { + boolean added = false; + long curTime = getTime(); + BlockRecoveryAttempt recoveryAttempt = + recoveryTimeouts.getElement(new BlockRecoveryAttempt(block)); + + if (recoveryAttempt == null) { + BlockRecoveryAttempt newAttempt = new BlockRecoveryAttempt( + block, curTime + recoveryTimeoutInterval); + added = recoveryTimeouts.add(newAttempt); + } else if (recoveryAttempt.hasTimedOut(curTime)) { + // Previous attempt timed out, reset the timeout + recoveryAttempt.setTimeout(curTime + recoveryTimeoutInterval); + added = true; + } else { + long timeoutIn = TimeUnit.MILLISECONDS.toSeconds( + recoveryAttempt.timeoutAt - curTime); + LOG.info("Block recovery attempt for " + block + " rejected, as the " + + "previous attempt times out in " + timeoutIn + " seconds."); + } + return added; + } + + /** + * Check whether the given block is under recovery. + * @param b block for which to check + * @return true if the given block is being recovered + */ + synchronized boolean isUnderRecovery(BlockInfo b) { + BlockRecoveryAttempt recoveryAttempt = + recoveryTimeouts.getElement(new BlockRecoveryAttempt(b)); + return recoveryAttempt != null; + } + + long getTime() { + return Time.monotonicNow(); + } + + @VisibleForTesting + synchronized void setRecoveryTimeoutInterval(long recoveryTimeoutInterval) { + this.recoveryTimeoutInterval = recoveryTimeoutInterval; + } + + /** + * Tracks timeout for block recovery attempt of a given block. + */ + private static class BlockRecoveryAttempt { + private final BlockInfo blockInfo; + private long timeoutAt; + + private BlockRecoveryAttempt(BlockInfo blockInfo) { + this(blockInfo, 0); + } + + BlockRecoveryAttempt(BlockInfo blockInfo, long timeoutAt) { + this.blockInfo = blockInfo; + this.timeoutAt = timeoutAt; + } + + boolean hasTimedOut(long currentTime) { + return currentTime > timeoutAt; + } + + void setTimeout(long newTimeoutAt) { + this.timeoutAt = newTimeoutAt; + } + + @Override + public int hashCode() { + return blockInfo.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof BlockRecoveryAttempt) { + return this.blockInfo.equals(((BlockRecoveryAttempt) obj).blockInfo); + } + return false; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index d3d9cdc42d..6a890e2907 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -3318,25 +3318,30 @@ boolean internalReleaseLease(Lease lease, String src, INodesInPath iip, + "Removed empty last block and closed file " + src); return true; } - // start recovery of the last block for this file - long blockRecoveryId = nextGenerationStamp( - blockManager.isLegacyBlock(lastBlock)); - lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile); - if(copyOnTruncate) { - lastBlock.setGenerationStamp(blockRecoveryId); - } else if(truncateRecovery) { - recoveryBlock.setGenerationStamp(blockRecoveryId); - } - uc.initializeBlockRecovery(lastBlock, blockRecoveryId, true); - leaseManager.renewLease(lease); - // Cannot close file right now, since the last block requires recovery. - // This may potentially cause infinite loop in lease recovery - // if there are no valid replicas on data-nodes. - NameNode.stateChangeLog.warn( - "DIR* NameSystem.internalReleaseLease: " + + // Start recovery of the last block for this file + // Only do so if there is no ongoing recovery for this block, + // or the previous recovery for this block timed out. + if (blockManager.addBlockRecoveryAttempt(lastBlock)) { + long blockRecoveryId = nextGenerationStamp( + blockManager.isLegacyBlock(lastBlock)); + if(copyOnTruncate) { + lastBlock.setGenerationStamp(blockRecoveryId); + } else if(truncateRecovery) { + recoveryBlock.setGenerationStamp(blockRecoveryId); + } + uc.initializeBlockRecovery(lastBlock, blockRecoveryId, true); + + // Cannot close file right now, since the last block requires recovery. + // This may potentially cause infinite loop in lease recovery + // if there are no valid replicas on data-nodes. + NameNode.stateChangeLog.warn( + "DIR* NameSystem.internalReleaseLease: " + "File " + src + " has not been closed." + - " Lease recovery is in progress. " + + " Lease recovery is in progress. " + "RecoveryId = " + blockRecoveryId + " for block " + lastBlock); + } + lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile); + leaseManager.renewLease(lease); break; } return false; @@ -3604,6 +3609,7 @@ void commitBlockSynchronization(ExtendedBlock oldBlock, // If this commit does not want to close the file, persist blocks FSDirWriteFileOp.persistBlocks(dir, src, iFile, false); } + blockManager.successfulBlockRecovery(storedBlock); } finally { writeUnlock("commitBlockSynchronization"); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java new file mode 100644 index 0000000000..baad89f549 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingRecoveryBlocks.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.hdfs.protocol.Block; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * This class contains unit tests for PendingRecoveryBlocks.java functionality. + */ +public class TestPendingRecoveryBlocks { + + private PendingRecoveryBlocks pendingRecoveryBlocks; + private final long recoveryTimeout = 1000L; + + private final BlockInfo blk1 = getBlock(1); + private final BlockInfo blk2 = getBlock(2); + private final BlockInfo blk3 = getBlock(3); + + @Before + public void setUp() { + pendingRecoveryBlocks = + Mockito.spy(new PendingRecoveryBlocks(recoveryTimeout)); + } + + BlockInfo getBlock(long blockId) { + return new BlockInfoContiguous(new Block(blockId), (short) 0); + } + + @Test + public void testAddDifferentBlocks() { + assertTrue(pendingRecoveryBlocks.add(blk1)); + assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk1)); + assertTrue(pendingRecoveryBlocks.add(blk2)); + assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk2)); + assertTrue(pendingRecoveryBlocks.add(blk3)); + assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk3)); + } + + @Test + public void testAddAndRemoveBlocks() { + // Add blocks + assertTrue(pendingRecoveryBlocks.add(blk1)); + assertTrue(pendingRecoveryBlocks.add(blk2)); + + // Remove blk1 + pendingRecoveryBlocks.remove(blk1); + + // Adding back blk1 should succeed + assertTrue(pendingRecoveryBlocks.add(blk1)); + } + + @Test + public void testAddBlockWithPreviousRecoveryTimedOut() { + // Add blk + Mockito.doReturn(0L).when(pendingRecoveryBlocks).getTime(); + assertTrue(pendingRecoveryBlocks.add(blk1)); + + // Should fail, has not timed out yet + Mockito.doReturn(recoveryTimeout / 2).when(pendingRecoveryBlocks).getTime(); + assertFalse(pendingRecoveryBlocks.add(blk1)); + + // Should succeed after timing out + Mockito.doReturn(recoveryTimeout * 2).when(pendingRecoveryBlocks).getTime(); + assertTrue(pendingRecoveryBlocks.add(blk1)); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 311d5a67c3..208447dfba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -18,7 +18,10 @@ package org.apache.hadoop.hdfs.server.datanode; +import org.apache.hadoop.hdfs.AppendTestUtil; +import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; + import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -43,6 +46,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -94,6 +98,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.GenericTestUtils.SleepAnswer; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Time; import org.apache.log4j.Level; @@ -1035,4 +1040,107 @@ public void run() { Assert.fail("Thread failure: " + failureReason); } } + + /** + * Test for block recovery taking longer than the heartbeat interval. + */ + @Test(timeout = 300000L) + public void testRecoverySlowerThanHeartbeat() throws Exception { + tearDown(); // Stop the Mocked DN started in startup() + + SleepAnswer delayer = new SleepAnswer(3000, 6000); + testRecoveryWithDatanodeDelayed(delayer); + } + + /** + * Test for block recovery timeout. All recovery attempts will be delayed + * and the first attempt will be lost to trigger recovery timeout and retry. + */ + @Test(timeout = 300000L) + public void testRecoveryTimeout() throws Exception { + tearDown(); // Stop the Mocked DN started in startup() + final Random r = new Random(); + + // Make sure first commitBlockSynchronization call from the DN gets lost + // for the recovery timeout to expire and new recovery attempt + // to be started. + SleepAnswer delayer = new SleepAnswer(3000) { + private final AtomicBoolean callRealMethod = new AtomicBoolean(); + + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + boolean interrupted = false; + try { + Thread.sleep(r.nextInt(3000) + 6000); + } catch (InterruptedException ie) { + interrupted = true; + } + try { + if (callRealMethod.get()) { + return invocation.callRealMethod(); + } + callRealMethod.set(true); + return null; + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + }; + testRecoveryWithDatanodeDelayed(delayer); + } + + private void testRecoveryWithDatanodeDelayed( + GenericTestUtils.SleepAnswer recoveryDelayer) throws Exception { + Configuration configuration = new HdfsConfiguration(); + configuration.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + MiniDFSCluster cluster = null; + + try { + cluster = new MiniDFSCluster.Builder(configuration) + .numDataNodes(2).build(); + cluster.waitActive(); + final FSNamesystem ns = cluster.getNamesystem(); + final NameNode nn = cluster.getNameNode(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + ns.getBlockManager().setBlockRecoveryTimeout( + TimeUnit.SECONDS.toMillis(10)); + + // Create a file and never close the output stream to trigger recovery + FSDataOutputStream out = dfs.create(new Path("/testSlowRecovery"), + (short) 2); + out.write(AppendTestUtil.randomBytes(0, 4096)); + out.hsync(); + + List dataNodes = cluster.getDataNodes(); + for (DataNode datanode : dataNodes) { + DatanodeProtocolClientSideTranslatorPB nnSpy = + InternalDataNodeTestUtils.spyOnBposToNN(datanode, nn); + + Mockito.doAnswer(recoveryDelayer).when(nnSpy). + commitBlockSynchronization( + Mockito.any(ExtendedBlock.class), Mockito.anyInt(), + Mockito.anyLong(), Mockito.anyBoolean(), + Mockito.anyBoolean(), Mockito.anyObject(), + Mockito.anyObject()); + } + + // Make sure hard lease expires to trigger replica recovery + cluster.setLeasePeriod(100L, 100L); + + // Wait for recovery to succeed + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return ns.getCompleteBlocksTotal() > 0; + } + }, 300, 300000); + + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java index dc7f47a79c..a5655787a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.Random; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; @@ -278,12 +279,14 @@ public void testLeaseRecoveryAfterFailover() throws Exception { // Disable permissions so that another user can recover the lease. conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false); conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); - + FSDataOutputStream stm = null; final MiniDFSCluster cluster = newMiniCluster(conf, 3); try { cluster.waitActive(); cluster.transitionToActive(0); + cluster.getNamesystem().getBlockManager().setBlockRecoveryTimeout( + TimeUnit.SECONDS.toMillis(1)); Thread.sleep(500); LOG.info("Starting with NN 0 active");