Revert "HDFS-11576. Block recovery will fail indefinitely if recovery time > heartbeat interval. Contributed by Lukas Majercak"

This reverts commit 5304698dc8.
This commit is contained in:
Chris Douglas 2017-12-01 11:19:01 -08:00
parent 7225ec0ceb
commit 53bbef3802
7 changed files with 20 additions and 413 deletions

View File

@ -641,16 +641,10 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
* conditions.
*/
public static class SleepAnswer implements Answer<Object> {
private final int minSleepTime;
private final int maxSleepTime;
private static Random r = new Random();
public SleepAnswer(int maxSleepTime) {
this(0, maxSleepTime);
}
public SleepAnswer(int minSleepTime, int maxSleepTime) {
this.minSleepTime = minSleepTime;
this.maxSleepTime = maxSleepTime;
}
@ -658,7 +652,7 @@ public SleepAnswer(int minSleepTime, int maxSleepTime) {
public Object answer(InvocationOnMock invocation) throws Throwable {
boolean interrupted = false;
try {
Thread.sleep(r.nextInt(maxSleepTime) + minSleepTime);
Thread.sleep(r.nextInt(maxSleepTime));
} catch (InterruptedException ie) {
interrupted = true;
}

View File

@ -164,8 +164,6 @@ public class BlockManager implements BlockStatsMXBean {
private static final String QUEUE_REASON_FUTURE_GENSTAMP =
"generation stamp is in the future";
private static final long BLOCK_RECOVERY_TIMEOUT_MULTIPLIER = 30;
private final Namesystem namesystem;
private final BlockManagerSafeMode bmSafeMode;
@ -355,9 +353,6 @@ public long getTotalECBlockGroups() {
@VisibleForTesting
final PendingReconstructionBlocks pendingReconstruction;
/** Stores information about block recovery attempts. */
private final PendingRecoveryBlocks pendingRecoveryBlocks;
/** The maximum number of replicas allowed for a block */
public final short maxReplication;
/**
@ -554,12 +549,6 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
}
this.minReplicationToBeInMaintenance = (short)minMaintenanceR;
long heartbeatIntervalSecs = conf.getTimeDuration(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
long blockRecoveryTimeout = getBlockRecoveryTimeout(heartbeatIntervalSecs);
pendingRecoveryBlocks = new PendingRecoveryBlocks(blockRecoveryTimeout);
this.blockReportLeaseManager = new BlockReportLeaseManager(conf);
bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf);
@ -4747,25 +4736,6 @@ public <T> T runBlockOp(final Callable<T> action)
}
}
/**
* Notification of a successful block recovery.
* @param block for which the recovery succeeded
*/
public void successfulBlockRecovery(BlockInfo block) {
pendingRecoveryBlocks.remove(block);
}
/**
* Checks whether a recovery attempt has been made for the given block.
* If so, checks whether that attempt has timed out.
* @param b block for which recovery is being attempted
* @return true if no recovery attempt has been made or
* the previous attempt timed out
*/
public boolean addBlockRecoveryAttempt(BlockInfo b) {
return pendingRecoveryBlocks.add(b);
}
@VisibleForTesting
public void flushBlockOps() throws IOException {
runBlockOp(new Callable<Void>(){
@ -4893,14 +4863,4 @@ private int setBlockIndices(BlockInfo blk, byte[] blockIndices, int i,
}
return i;
}
private static long getBlockRecoveryTimeout(long heartbeatIntervalSecs) {
return TimeUnit.SECONDS.toMillis(heartbeatIntervalSecs *
BLOCK_RECOVERY_TIMEOUT_MULTIPLIER);
}
@VisibleForTesting
public void setBlockRecoveryTimeout(long blockRecoveryTimeout) {
pendingRecoveryBlocks.setRecoveryTimeoutInterval(blockRecoveryTimeout);
}
}

View File

@ -1,143 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import java.util.concurrent.TimeUnit;
/**
* PendingRecoveryBlocks tracks recovery attempts for each block and their
* timeouts to ensure we do not have multiple recoveries at the same time
* and retry only after the timeout for a recovery has expired.
*/
class PendingRecoveryBlocks {
private static final Logger LOG = BlockManager.LOG;
/** List of recovery attempts per block and the time they expire. */
private final LightWeightHashSet<BlockRecoveryAttempt> recoveryTimeouts =
new LightWeightHashSet<>();
/** The timeout for issuing a block recovery again.
* (it should be larger than the time to recover a block)
*/
private long recoveryTimeoutInterval;
PendingRecoveryBlocks(long timeout) {
this.recoveryTimeoutInterval = timeout;
}
/**
* Remove recovery attempt for the given block.
* @param block whose recovery attempt to remove.
*/
synchronized void remove(BlockInfo block) {
recoveryTimeouts.remove(new BlockRecoveryAttempt(block));
}
/**
* Checks whether a recovery attempt has been made for the given block.
* If so, checks whether that attempt has timed out.
* @param block block for which recovery is being attempted
* @return true if no recovery attempt has been made or
* the previous attempt timed out
*/
synchronized boolean add(BlockInfo block) {
boolean added = false;
long curTime = getTime();
BlockRecoveryAttempt recoveryAttempt =
recoveryTimeouts.getElement(new BlockRecoveryAttempt(block));
if (recoveryAttempt == null) {
BlockRecoveryAttempt newAttempt = new BlockRecoveryAttempt(
block, curTime + recoveryTimeoutInterval);
added = recoveryTimeouts.add(newAttempt);
} else if (recoveryAttempt.hasTimedOut(curTime)) {
// Previous attempt timed out, reset the timeout
recoveryAttempt.setTimeout(curTime + recoveryTimeoutInterval);
added = true;
} else {
long timeoutIn = TimeUnit.MILLISECONDS.toSeconds(
recoveryAttempt.timeoutAt - curTime);
LOG.info("Block recovery attempt for " + block + " rejected, as the " +
"previous attempt times out in " + timeoutIn + " seconds.");
}
return added;
}
/**
* Check whether the given block is under recovery.
* @param b block for which to check
* @return true if the given block is being recovered
*/
synchronized boolean isUnderRecovery(BlockInfo b) {
BlockRecoveryAttempt recoveryAttempt =
recoveryTimeouts.getElement(new BlockRecoveryAttempt(b));
return recoveryAttempt != null;
}
long getTime() {
return Time.monotonicNow();
}
@VisibleForTesting
synchronized void setRecoveryTimeoutInterval(long recoveryTimeoutInterval) {
this.recoveryTimeoutInterval = recoveryTimeoutInterval;
}
/**
* Tracks timeout for block recovery attempt of a given block.
*/
private static class BlockRecoveryAttempt {
private final BlockInfo blockInfo;
private long timeoutAt;
private BlockRecoveryAttempt(BlockInfo blockInfo) {
this(blockInfo, 0);
}
BlockRecoveryAttempt(BlockInfo blockInfo, long timeoutAt) {
this.blockInfo = blockInfo;
this.timeoutAt = timeoutAt;
}
boolean hasTimedOut(long currentTime) {
return currentTime > timeoutAt;
}
void setTimeout(long newTimeoutAt) {
this.timeoutAt = newTimeoutAt;
}
@Override
public int hashCode() {
return blockInfo.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof BlockRecoveryAttempt) {
return this.blockInfo.equals(((BlockRecoveryAttempt) obj).blockInfo);
}
return false;
}
}
}

View File

@ -3318,30 +3318,25 @@ boolean internalReleaseLease(Lease lease, String src, INodesInPath iip,
+ "Removed empty last block and closed file " + src);
return true;
}
// Start recovery of the last block for this file
// Only do so if there is no ongoing recovery for this block,
// or the previous recovery for this block timed out.
if (blockManager.addBlockRecoveryAttempt(lastBlock)) {
long blockRecoveryId = nextGenerationStamp(
blockManager.isLegacyBlock(lastBlock));
if(copyOnTruncate) {
lastBlock.setGenerationStamp(blockRecoveryId);
} else if(truncateRecovery) {
recoveryBlock.setGenerationStamp(blockRecoveryId);
}
uc.initializeBlockRecovery(lastBlock, blockRecoveryId, true);
// Cannot close file right now, since the last block requires recovery.
// This may potentially cause infinite loop in lease recovery
// if there are no valid replicas on data-nodes.
NameNode.stateChangeLog.warn(
"DIR* NameSystem.internalReleaseLease: " +
"File " + src + " has not been closed." +
" Lease recovery is in progress. " +
"RecoveryId = " + blockRecoveryId + " for block " + lastBlock);
}
// start recovery of the last block for this file
long blockRecoveryId = nextGenerationStamp(
blockManager.isLegacyBlock(lastBlock));
lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
if(copyOnTruncate) {
lastBlock.setGenerationStamp(blockRecoveryId);
} else if(truncateRecovery) {
recoveryBlock.setGenerationStamp(blockRecoveryId);
}
uc.initializeBlockRecovery(lastBlock, blockRecoveryId, true);
leaseManager.renewLease(lease);
// Cannot close file right now, since the last block requires recovery.
// This may potentially cause infinite loop in lease recovery
// if there are no valid replicas on data-nodes.
NameNode.stateChangeLog.warn(
"DIR* NameSystem.internalReleaseLease: " +
"File " + src + " has not been closed." +
" Lease recovery is in progress. " +
"RecoveryId = " + blockRecoveryId + " for block " + lastBlock);
break;
}
return false;
@ -3609,7 +3604,6 @@ void commitBlockSynchronization(ExtendedBlock oldBlock,
// If this commit does not want to close the file, persist blocks
FSDirWriteFileOp.persistBlocks(dir, src, iFile, false);
}
blockManager.successfulBlockRecovery(storedBlock);
} finally {
writeUnlock("commitBlockSynchronization");
}

View File

@ -1,87 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.hdfs.protocol.Block;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* This class contains unit tests for PendingRecoveryBlocks.java functionality.
*/
public class TestPendingRecoveryBlocks {
private PendingRecoveryBlocks pendingRecoveryBlocks;
private final long recoveryTimeout = 1000L;
private final BlockInfo blk1 = getBlock(1);
private final BlockInfo blk2 = getBlock(2);
private final BlockInfo blk3 = getBlock(3);
@Before
public void setUp() {
pendingRecoveryBlocks =
Mockito.spy(new PendingRecoveryBlocks(recoveryTimeout));
}
BlockInfo getBlock(long blockId) {
return new BlockInfoContiguous(new Block(blockId), (short) 0);
}
@Test
public void testAddDifferentBlocks() {
assertTrue(pendingRecoveryBlocks.add(blk1));
assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk1));
assertTrue(pendingRecoveryBlocks.add(blk2));
assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk2));
assertTrue(pendingRecoveryBlocks.add(blk3));
assertTrue(pendingRecoveryBlocks.isUnderRecovery(blk3));
}
@Test
public void testAddAndRemoveBlocks() {
// Add blocks
assertTrue(pendingRecoveryBlocks.add(blk1));
assertTrue(pendingRecoveryBlocks.add(blk2));
// Remove blk1
pendingRecoveryBlocks.remove(blk1);
// Adding back blk1 should succeed
assertTrue(pendingRecoveryBlocks.add(blk1));
}
@Test
public void testAddBlockWithPreviousRecoveryTimedOut() {
// Add blk
Mockito.doReturn(0L).when(pendingRecoveryBlocks).getTime();
assertTrue(pendingRecoveryBlocks.add(blk1));
// Should fail, has not timed out yet
Mockito.doReturn(recoveryTimeout / 2).when(pendingRecoveryBlocks).getTime();
assertFalse(pendingRecoveryBlocks.add(blk1));
// Should succeed after timing out
Mockito.doReturn(recoveryTimeout * 2).when(pendingRecoveryBlocks).getTime();
assertTrue(pendingRecoveryBlocks.add(blk1));
}
}

View File

@ -18,10 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@ -46,7 +43,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@ -98,7 +94,6 @@
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.SleepAnswer;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
@ -1040,107 +1035,4 @@ public void run() {
Assert.fail("Thread failure: " + failureReason);
}
}
/**
* Test for block recovery taking longer than the heartbeat interval.
*/
@Test(timeout = 300000L)
public void testRecoverySlowerThanHeartbeat() throws Exception {
tearDown(); // Stop the Mocked DN started in startup()
SleepAnswer delayer = new SleepAnswer(3000, 6000);
testRecoveryWithDatanodeDelayed(delayer);
}
/**
* Test for block recovery timeout. All recovery attempts will be delayed
* and the first attempt will be lost to trigger recovery timeout and retry.
*/
@Test(timeout = 300000L)
public void testRecoveryTimeout() throws Exception {
tearDown(); // Stop the Mocked DN started in startup()
final Random r = new Random();
// Make sure first commitBlockSynchronization call from the DN gets lost
// for the recovery timeout to expire and new recovery attempt
// to be started.
SleepAnswer delayer = new SleepAnswer(3000) {
private final AtomicBoolean callRealMethod = new AtomicBoolean();
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
boolean interrupted = false;
try {
Thread.sleep(r.nextInt(3000) + 6000);
} catch (InterruptedException ie) {
interrupted = true;
}
try {
if (callRealMethod.get()) {
return invocation.callRealMethod();
}
callRealMethod.set(true);
return null;
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
};
testRecoveryWithDatanodeDelayed(delayer);
}
private void testRecoveryWithDatanodeDelayed(
GenericTestUtils.SleepAnswer recoveryDelayer) throws Exception {
Configuration configuration = new HdfsConfiguration();
configuration.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(configuration)
.numDataNodes(2).build();
cluster.waitActive();
final FSNamesystem ns = cluster.getNamesystem();
final NameNode nn = cluster.getNameNode();
final DistributedFileSystem dfs = cluster.getFileSystem();
ns.getBlockManager().setBlockRecoveryTimeout(
TimeUnit.SECONDS.toMillis(10));
// Create a file and never close the output stream to trigger recovery
FSDataOutputStream out = dfs.create(new Path("/testSlowRecovery"),
(short) 2);
out.write(AppendTestUtil.randomBytes(0, 4096));
out.hsync();
List<DataNode> dataNodes = cluster.getDataNodes();
for (DataNode datanode : dataNodes) {
DatanodeProtocolClientSideTranslatorPB nnSpy =
InternalDataNodeTestUtils.spyOnBposToNN(datanode, nn);
Mockito.doAnswer(recoveryDelayer).when(nnSpy).
commitBlockSynchronization(
Mockito.any(ExtendedBlock.class), Mockito.anyInt(),
Mockito.anyLong(), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.anyObject(),
Mockito.anyObject());
}
// Make sure hard lease expires to trigger replica recovery
cluster.setLeasePeriod(100L, 100L);
// Wait for recovery to succeed
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return ns.getCompleteBlocksTotal() > 0;
}
}, 300, 300000);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -25,7 +25,6 @@
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
@ -279,14 +278,12 @@ public void testLeaseRecoveryAfterFailover() throws Exception {
// Disable permissions so that another user can recover the lease.
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
FSDataOutputStream stm = null;
final MiniDFSCluster cluster = newMiniCluster(conf, 3);
try {
cluster.waitActive();
cluster.transitionToActive(0);
cluster.getNamesystem().getBlockManager().setBlockRecoveryTimeout(
TimeUnit.SECONDS.toMillis(1));
Thread.sleep(500);
LOG.info("Starting with NN 0 active");