HDFS-11576. Block recovery will fail indefinitely if recovery time > heartbeat interval. Contributed by Lukas Majercak

This commit is contained in:
Chris Douglas 2017-12-01 22:34:30 -08:00
parent f9d195dfe9
commit 42307e3c3a
8 changed files with 421 additions and 20 deletions

View File

@ -641,10 +641,16 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
* conditions.
*/
public static class SleepAnswer implements Answer<Object> {
private final int minSleepTime;
private final int maxSleepTime;
private static Random r = new Random();
public SleepAnswer(int maxSleepTime) {
this(0, maxSleepTime);
}
public SleepAnswer(int minSleepTime, int maxSleepTime) {
this.minSleepTime = minSleepTime;
this.maxSleepTime = maxSleepTime;
}
@ -652,7 +658,7 @@ public SleepAnswer(int maxSleepTime) {
public Object answer(InvocationOnMock invocation) throws Throwable {
boolean interrupted = false;
try {
Thread.sleep(r.nextInt(maxSleepTime));
Thread.sleep(r.nextInt(maxSleepTime) + minSleepTime);
} catch (InterruptedException ie) {
interrupted = true;
}

View File

@ -164,6 +164,8 @@ public class BlockManager implements BlockStatsMXBean {
private static final String QUEUE_REASON_FUTURE_GENSTAMP =
"generation stamp is in the future";
private static final long BLOCK_RECOVERY_TIMEOUT_MULTIPLIER = 30;
private final Namesystem namesystem;
private final BlockManagerSafeMode bmSafeMode;
@ -353,6 +355,9 @@ public long getTotalECBlockGroups() {
@VisibleForTesting
final PendingReconstructionBlocks pendingReconstruction;
/** Stores information about block recovery attempts. */
private final PendingRecoveryBlocks pendingRecoveryBlocks;
/** The maximum number of replicas allowed for a block */
public final short maxReplication;
/**
@ -549,6 +554,12 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
}
this.minReplicationToBeInMaintenance = (short)minMaintenanceR;
long heartbeatIntervalSecs = conf.getTimeDuration(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
long blockRecoveryTimeout = getBlockRecoveryTimeout(heartbeatIntervalSecs);
pendingRecoveryBlocks = new PendingRecoveryBlocks(blockRecoveryTimeout);
this.blockReportLeaseManager = new BlockReportLeaseManager(conf);
bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf);
@ -4736,6 +4747,25 @@ public <T> T runBlockOp(final Callable<T> action)
}
}
/**
* Notification of a successful block recovery.
* @param block for which the recovery succeeded
*/
public void successfulBlockRecovery(BlockInfo block) {
pendingRecoveryBlocks.remove(block);
}
/**
* Checks whether a recovery attempt has been made for the given block.
* If so, checks whether that attempt has timed out.
* @param b block for which recovery is being attempted
* @return true if no recovery attempt has been made or
* the previous attempt timed out
*/
public boolean addBlockRecoveryAttempt(BlockInfo b) {
return pendingRecoveryBlocks.add(b);
}
@VisibleForTesting
public void flushBlockOps() throws IOException {
runBlockOp(new Callable<Void>(){
@ -4863,4 +4893,14 @@ private int setBlockIndices(BlockInfo blk, byte[] blockIndices, int i,
}
return i;
}
private static long getBlockRecoveryTimeout(long heartbeatIntervalSecs) {
return TimeUnit.SECONDS.toMillis(heartbeatIntervalSecs *
BLOCK_RECOVERY_TIMEOUT_MULTIPLIER);
}
@VisibleForTesting
public void setBlockRecoveryTimeout(long blockRecoveryTimeout) {
pendingRecoveryBlocks.setRecoveryTimeoutInterval(blockRecoveryTimeout);
}
}

View File

@ -0,0 +1,143 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import java.util.concurrent.TimeUnit;
/**
* PendingRecoveryBlocks tracks recovery attempts for each block and their
* timeouts to ensure we do not have multiple recoveries at the same time
* and retry only after the timeout for a recovery has expired.
*/
class PendingRecoveryBlocks {
private static final Logger LOG = BlockManager.LOG;
/** List of recovery attempts per block and the time they expire. */
private final LightWeightHashSet<BlockRecoveryAttempt> recoveryTimeouts =
new LightWeightHashSet<>();
/** The timeout for issuing a block recovery again.
* (it should be larger than the time to recover a block)
*/
private long recoveryTimeoutInterval;
PendingRecoveryBlocks(long timeout) {
this.recoveryTimeoutInterval = timeout;
}
/**
* Remove recovery attempt for the given block.
* @param block whose recovery attempt to remove.
*/
synchronized void remove(BlockInfo block) {
recoveryTimeouts.remove(new BlockRecoveryAttempt(block));
}
/**
* Checks whether a recovery attempt has been made for the given block.
* If so, checks whether that attempt has timed out.
* @param block block for which recovery is being attempted
* @return true if no recovery attempt has been made or
* the previous attempt timed out
*/
synchronized boolean add(BlockInfo block) {
boolean added = false;
long curTime = getTime();
BlockRecoveryAttempt recoveryAttempt =
recoveryTimeouts.getElement(new BlockRecoveryAttempt(block));
if (recoveryAttempt == null) {
BlockRecoveryAttempt newAttempt = new BlockRecoveryAttempt(
block, curTime + recoveryTimeoutInterval);
added = recoveryTimeouts.add(newAttempt);
} else if (recoveryAttempt.hasTimedOut(curTime)) {
// Previous attempt timed out, reset the timeout
recoveryAttempt.setTimeout(curTime + recoveryTimeoutInterval);
added = true;
} else {
long timeoutIn = TimeUnit.MILLISECONDS.toSeconds(
recoveryAttempt.timeoutAt - curTime);
LOG.info("Block recovery attempt for " + block + " rejected, as the " +
"previous attempt times out in " + timeoutIn + " seconds.");
}
return added;
}
/**
* Check whether the given block is under recovery.
* @param b block for which to check
* @return true if the given block is being recovered
*/
synchronized boolean isUnderRecovery(BlockInfo b) {
BlockRecoveryAttempt recoveryAttempt =
recoveryTimeouts.getElement(new BlockRecoveryAttempt(b));
return recoveryAttempt != null;
}
long getTime() {
return Time.monotonicNow();
}
@VisibleForTesting
synchronized void setRecoveryTimeoutInterval(long recoveryTimeoutInterval) {
this.recoveryTimeoutInterval = recoveryTimeoutInterval;
}
/**
* Tracks timeout for block recovery attempt of a given block.
*/
private static class BlockRecoveryAttempt {
private final BlockInfo blockInfo;
private long timeoutAt;
private BlockRecoveryAttempt(BlockInfo blockInfo) {
this(blockInfo, 0);
}
BlockRecoveryAttempt(BlockInfo blockInfo, long timeoutAt) {
this.blockInfo = blockInfo;
this.timeoutAt = timeoutAt;
}
boolean hasTimedOut(long currentTime) {
return currentTime > timeoutAt;
}
void setTimeout(long newTimeoutAt) {
this.timeoutAt = newTimeoutAt;
}
@Override
public int hashCode() {
return blockInfo.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof BlockRecoveryAttempt) {
return this.blockInfo.equals(((BlockRecoveryAttempt) obj).blockInfo);
}
return false;
}
}
}

View File

@ -3318,25 +3318,30 @@ boolean internalReleaseLease(Lease lease, String src, INodesInPath iip,
+ "Removed empty last block and closed file " + src);
return true;
}
// start recovery of the last block for this file
long blockRecoveryId = nextGenerationStamp(
blockManager.isLegacyBlock(lastBlock));
lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
if(copyOnTruncate) {
lastBlock.setGenerationStamp(blockRecoveryId);
} else if(truncateRecovery) {
recoveryBlock.setGenerationStamp(blockRecoveryId);
}
uc.initializeBlockRecovery(lastBlock, blockRecoveryId, true);
leaseManager.renewLease(lease);
// Cannot close file right now, since the last block requires recovery.
// This may potentially cause infinite loop in lease recovery
// if there are no valid replicas on data-nodes.
NameNode.stateChangeLog.warn(
"DIR* NameSystem.internalReleaseLease: " +
// Start recovery of the last block for this file
// Only do so if there is no ongoing recovery for this block,
// or the previous recovery for this block timed out.
if (blockManager.addBlockRecoveryAttempt(lastBlock)) {
long blockRecoveryId = nextGenerationStamp(
blockManager.isLegacyBlock(lastBlock));
if(copyOnTruncate) {
lastBlock.setGenerationStamp(blockRecoveryId);
} else if(truncateRecovery) {
recoveryBlock.setGenerationStamp(blockRecoveryId);
}
uc.initializeBlockRecovery(lastBlock, blockRecoveryId, true);
// Cannot close file right now, since the last block requires recovery.
// This may potentially cause infinite loop in lease recovery
// if there are no valid replicas on data-nodes.
NameNode.stateChangeLog.warn(
"DIR* NameSystem.internalReleaseLease: " +
"File " + src + " has not been closed." +
" Lease recovery is in progress. " +
" Lease recovery is in progress. " +
"RecoveryId = " + blockRecoveryId + " for block " + lastBlock);
}
lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
leaseManager.renewLease(lease);
break;
}
return false;
@ -3604,6 +3609,7 @@ void commitBlockSynchronization(ExtendedBlock oldBlock,
// If this commit does not want to close the file, persist blocks
FSDirWriteFileOp.persistBlocks(dir, src, iFile, false);
}
blockManager.successfulBlockRecovery(storedBlock);
} finally {
writeUnlock("commitBlockSynchronization");
}

View File

@ -3100,6 +3100,16 @@ public void addNameNode(Configuration conf, int namenodePort)
// Wait for new namenode to get registrations from all the datanodes
waitActive(nnIndex);
}
/**
* Sets the timeout for re-issuing a block recovery.
*/
public void setBlockRecoveryTimeout(long timeout) {
for (int nnIndex = 0; nnIndex < getNumNameNodes(); nnIndex++) {
getNamesystem(nnIndex).getBlockManager().setBlockRecoveryTimeout(
timeout);
}
}
protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
boolean checkDataNodeAddrConfig) throws IOException {

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.hdfs.protocol.Block;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* This class contains unit tests for PendingRecoveryBlocks.java functionality.
*/
public class TestPendingRecoveryBlocks {
private PendingRecoveryBlocks pendingRecoveryBlocks;
private final long recoveryTimeout = 1000L;
private final BlockInfo blk1 = getBlock(1);
private final BlockInfo blk2 = getBlock(2);
private final BlockInfo blk3 = getBlock(3);
@Before
public void setUp() {
pendingRecoveryBlocks =
Mockito.spy(new PendingRecoveryBlocks(recoveryTimeout));
}
BlockInfo getBlock(long blockId) {
return new BlockInfoContiguous(new Block(blockId), (short) 0);
}
@Test
public void testAddDifferentBlocks() {
assertTrue(pendingRecoveryBlocks.add(blk1));
assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk1));
assertTrue(pendingRecoveryBlocks.add(blk2));
assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk2));
assertTrue(pendingRecoveryBlocks.add(blk3));
assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk3));
}
@Test
public void testAddAndRemoveBlocks() {
// Add blocks
assertTrue(pendingRecoveryBlocks.add(blk1));
assertTrue(pendingRecoveryBlocks.add(blk2));
// Remove blk1
pendingRecoveryBlocks.remove(blk1);
// Adding back blk1 should succeed
assertTrue(pendingRecoveryBlocks.add(blk1));
}
@Test
public void testAddBlockWithPreviousRecoveryTimedOut() {
// Add blk
Mockito.doReturn(0L).when(pendingRecoveryBlocks).getTime();
assertTrue(pendingRecoveryBlocks.add(blk1));
// Should fail, has not timed out yet
Mockito.doReturn(recoveryTimeout / 2).when(pendingRecoveryBlocks).getTime();
assertFalse(pendingRecoveryBlocks.add(blk1));
// Should succeed after timing out
Mockito.doReturn(recoveryTimeout * 2).when(pendingRecoveryBlocks).getTime();
assertTrue(pendingRecoveryBlocks.add(blk1));
}
}

View File

@ -18,7 +18,10 @@
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@ -43,6 +46,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@ -94,6 +98,7 @@
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.SleepAnswer;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
@ -1035,4 +1040,106 @@ public void run() {
Assert.fail("Thread failure: " + failureReason);
}
}
/**
* Test for block recovery taking longer than the heartbeat interval.
*/
@Test(timeout = 300000L)
public void testRecoverySlowerThanHeartbeat() throws Exception {
tearDown(); // Stop the Mocked DN started in startup()
SleepAnswer delayer = new SleepAnswer(3000, 6000);
testRecoveryWithDatanodeDelayed(delayer);
}
/**
* Test for block recovery timeout. All recovery attempts will be delayed
* and the first attempt will be lost to trigger recovery timeout and retry.
*/
@Test(timeout = 300000L)
public void testRecoveryTimeout() throws Exception {
tearDown(); // Stop the Mocked DN started in startup()
final Random r = new Random();
// Make sure first commitBlockSynchronization call from the DN gets lost
// for the recovery timeout to expire and new recovery attempt
// to be started.
SleepAnswer delayer = new SleepAnswer(3000) {
private final AtomicBoolean callRealMethod = new AtomicBoolean();
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
boolean interrupted = false;
try {
Thread.sleep(r.nextInt(3000) + 6000);
} catch (InterruptedException ie) {
interrupted = true;
}
try {
if (callRealMethod.get()) {
return invocation.callRealMethod();
}
callRealMethod.set(true);
return null;
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
};
testRecoveryWithDatanodeDelayed(delayer);
}
private void testRecoveryWithDatanodeDelayed(
GenericTestUtils.SleepAnswer recoveryDelayer) throws Exception {
Configuration configuration = new HdfsConfiguration();
configuration.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(configuration)
.numDataNodes(2).build();
cluster.waitActive();
final FSNamesystem ns = cluster.getNamesystem();
final NameNode nn = cluster.getNameNode();
final DistributedFileSystem dfs = cluster.getFileSystem();
cluster.setBlockRecoveryTimeout(TimeUnit.SECONDS.toMillis(15));
// Create a file and never close the output stream to trigger recovery
FSDataOutputStream out = dfs.create(new Path("/testSlowRecovery"),
(short) 2);
out.write(AppendTestUtil.randomBytes(0, 4096));
out.hsync();
List<DataNode> dataNodes = cluster.getDataNodes();
for (DataNode datanode : dataNodes) {
DatanodeProtocolClientSideTranslatorPB nnSpy =
InternalDataNodeTestUtils.spyOnBposToNN(datanode, nn);
Mockito.doAnswer(recoveryDelayer).when(nnSpy).
commitBlockSynchronization(
Mockito.any(ExtendedBlock.class), Mockito.anyInt(),
Mockito.anyLong(), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.any(DatanodeID[].class),
Mockito.any(String[].class));
}
// Make sure hard lease expires to trigger replica recovery
cluster.setLeasePeriod(100L, 100L);
// Wait for recovery to succeed
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return ns.getCompleteBlocksTotal() > 0;
}
}, 300, 300000);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -25,6 +25,7 @@
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
@ -278,12 +279,13 @@ public void testLeaseRecoveryAfterFailover() throws Exception {
// Disable permissions so that another user can recover the lease.
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
FSDataOutputStream stm = null;
final MiniDFSCluster cluster = newMiniCluster(conf, 3);
try {
cluster.waitActive();
cluster.transitionToActive(0);
cluster.setBlockRecoveryTimeout(TimeUnit.SECONDS.toMillis(1));
Thread.sleep(500);
LOG.info("Starting with NN 0 active");