HDFS-7686. Re-add rapid rescan of possibly corrupt block feature to the block scanner (cmccabe)

This commit is contained in:
Colin Patrick Mccabe 2015-02-13 14:35:49 -08:00
parent 875256834b
commit 8bb9a5000e
5 changed files with 269 additions and 35 deletions

View File

@ -959,6 +959,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7776. Adding additional unit tests for Quota By Storage Type.
(Xiaoyu Yao via Arpit Agarwal)
HDFS-7686. Re-add rapid rescan of possibly corrupt block feature to the
block scanner (cmccabe)
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -29,6 +29,7 @@
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.VolumeScanner.ScanResultHandler;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
@ -278,6 +279,37 @@ synchronized void printStats(StringBuilder p) {
}
}
/**
* Mark a block as "suspect."
*
* This means that we should try to rescan it soon. Note that the
* VolumeScanner keeps a list of recently suspicious blocks, which
* it uses to avoid rescanning the same block over and over in a short
* time frame.
*
* @param storageId The ID of the storage where the block replica
* is being stored.
* @param block The block's ID and block pool id.
*/
synchronized void markSuspectBlock(String storageId, ExtendedBlock block) {
if (!isEnabled()) {
LOG.info("Not scanning suspicious block {} on {}, because the block " +
"scanner is disabled.", block, storageId);
return;
}
VolumeScanner scanner = scanners.get(storageId);
if (scanner == null) {
// This could happen if the volume is in the process of being removed.
// The removal process shuts down the VolumeScanner, but the volume
// object stays around as long as there are references to it (which
// should not be that long.)
LOG.info("Not scanning suspicious block {} on {}, because there is no " +
"volume scanner for that storageId.", block, storageId);
return;
}
scanner.markSuspectBlock(block);
}
@InterfaceAudience.Private
public static class Servlet extends HttpServlet {
private static final long serialVersionUID = 1L;

View File

@ -601,6 +601,9 @@ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
LOG.error("BlockSender.sendChunks() exception: ", e);
}
datanode.getBlockScanner().markSuspectBlock(
volumeRef.getVolume().getStorageID(),
block);
}
throw ioeToSocketException(e);
}

View File

@ -22,12 +22,15 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf;
@ -116,6 +119,21 @@ public class VolumeScanner extends Thread {
private final List<BlockIterator> blockIters =
new LinkedList<BlockIterator>();
/**
* Blocks which are suspect.
* The scanner prioritizes scanning these blocks.
*/
private final LinkedHashSet<ExtendedBlock> suspectBlocks =
new LinkedHashSet<ExtendedBlock>();
/**
* Blocks which were suspect which we have scanned.
* This is used to avoid scanning the same suspect block over and over.
*/
private final Cache<ExtendedBlock, Boolean> recentSuspectBlocks =
CacheBuilder.newBuilder().maximumSize(1000)
.expireAfterAccess(10, TimeUnit.MINUTES).build();
/**
* The current block iterator, or null if there is none.
*/
@ -458,10 +476,13 @@ static boolean calculateShouldScan(String storageId, long targetBytesPerSec,
/**
* Run an iteration of the VolumeScanner loop.
*
* @param suspectBlock A suspect block which we should scan, or null to
* scan the next regularly scheduled block.
*
* @return The number of milliseconds to delay before running the loop
* again, or 0 to re-run the loop immediately.
*/
private long runLoop() {
private long runLoop(ExtendedBlock suspectBlock) {
long bytesScanned = -1;
boolean scanError = false;
ExtendedBlock block = null;
@ -477,40 +498,43 @@ private long runLoop() {
}
// Find a usable block pool to scan.
if ((curBlockIter == null) || curBlockIter.atEnd()) {
long timeout = findNextUsableBlockIter();
if (timeout > 0) {
LOG.trace("{}: no block pools are ready to scan yet. Waiting " +
"{} ms.", this, timeout);
synchronized (stats) {
stats.nextBlockPoolScanStartMs = Time.monotonicNow() + timeout;
if (suspectBlock != null) {
block = suspectBlock;
} else {
if ((curBlockIter == null) || curBlockIter.atEnd()) {
long timeout = findNextUsableBlockIter();
if (timeout > 0) {
LOG.trace("{}: no block pools are ready to scan yet. Waiting " +
"{} ms.", this, timeout);
synchronized (stats) {
stats.nextBlockPoolScanStartMs = Time.monotonicNow() + timeout;
}
return timeout;
}
return timeout;
synchronized (stats) {
stats.scansSinceRestart++;
stats.blocksScannedInCurrentPeriod = 0;
stats.nextBlockPoolScanStartMs = -1;
}
return 0L;
}
synchronized (stats) {
stats.scansSinceRestart++;
stats.blocksScannedInCurrentPeriod = 0;
stats.nextBlockPoolScanStartMs = -1;
try {
block = curBlockIter.nextBlock();
} catch (IOException e) {
// There was an error listing the next block in the volume. This is a
// serious issue.
LOG.warn("{}: nextBlock error on {}", this, curBlockIter);
// On the next loop iteration, curBlockIter#eof will be set to true, and
// we will pick a different block iterator.
return 0L;
}
if (block == null) {
// The BlockIterator is at EOF.
LOG.info("{}: finished scanning block pool {}",
this, curBlockIter.getBlockPoolId());
saveBlockIterator(curBlockIter);
return 0;
}
return 0L;
}
try {
block = curBlockIter.nextBlock();
} catch (IOException e) {
// There was an error listing the next block in the volume. This is a
// serious issue.
LOG.warn("{}: nextBlock error on {}", this, curBlockIter);
// On the next loop iteration, curBlockIter#eof will be set to true, and
// we will pick a different block iterator.
return 0L;
}
if (block == null) {
// The BlockIterator is at EOF.
LOG.info("{}: finished scanning block pool {}",
this, curBlockIter.getBlockPoolId());
saveBlockIterator(curBlockIter);
return 0;
}
long saveDelta = monotonicMs - curBlockIter.getLastSavedMs();
if (saveDelta >= conf.cursorSaveMs) {
@ -529,7 +553,7 @@ private long runLoop() {
} finally {
synchronized (stats) {
stats.bytesScannedInPastHour = scannedBytesSum;
if (bytesScanned >= 0) {
if (bytesScanned > 0) {
stats.blocksScannedInCurrentPeriod++;
stats.blocksScannedSinceRestart++;
}
@ -551,6 +575,20 @@ private long runLoop() {
}
}
/**
* If there are elements in the suspectBlocks list, removes
* and returns the first one. Otherwise, returns null.
*/
private synchronized ExtendedBlock popNextSuspectBlock() {
Iterator<ExtendedBlock> iter = suspectBlocks.iterator();
if (!iter.hasNext()) {
return null;
}
ExtendedBlock block = iter.next();
iter.remove();
return block;
}
@Override
public void run() {
// Record the minute on which the scanner started.
@ -563,7 +601,9 @@ public void run() {
try {
long timeout = 0;
while (true) {
// Take the lock to check if we should stop.
ExtendedBlock suspectBlock = null;
// Take the lock to check if we should stop, and access the
// suspect block list.
synchronized (this) {
if (stopping) {
break;
@ -574,8 +614,9 @@ public void run() {
break;
}
}
suspectBlock = popNextSuspectBlock();
}
timeout = runLoop();
timeout = runLoop(suspectBlock);
}
} catch (InterruptedException e) {
// We are exiting because of an InterruptedException,
@ -612,6 +653,30 @@ public synchronized void shutdown() {
this.interrupt();
}
public synchronized void markSuspectBlock(ExtendedBlock block) {
if (stopping) {
LOG.info("{}: Not scheduling suspect block {} for " +
"rescanning, because this volume scanner is stopping.", this, block);
return;
}
Boolean recent = recentSuspectBlocks.getIfPresent(block);
if (recent != null) {
LOG.info("{}: Not scheduling suspect block {} for " +
"rescanning, because we rescanned it recently.", this, block);
return;
}
if (suspectBlocks.contains(block)) {
LOG.info("{}: suspect block {} is already queued for " +
"rescanning.", this, block);
return;
}
suspectBlocks.add(block);
recentSuspectBlocks.put(block, true);
LOG.info("{}: Scheduling suspect block {} for rescanning.", this, block);
notify(); // wake scanner thread.
}
/**
* Allow the scanner to scan the given block pool.
*

View File

@ -268,6 +268,20 @@ static class Info {
final Set<ExtendedBlock> goodBlocks = new HashSet<ExtendedBlock>();
long blocksScanned = 0;
Semaphore sem = null;
@Override
public String toString() {
final StringBuilder bld = new StringBuilder();
bld.append("ScanResultHandler.Info{");
bld.append("shouldRun=").append(shouldRun).append(", ");
bld.append("blocksScanned=").append(blocksScanned).append(", ");
bld.append("sem#availablePermits=").append(sem.availablePermits()).
append(", ");
bld.append("badBlocks=").append(badBlocks).append(", ");
bld.append("goodBlocks=").append(goodBlocks);
bld.append("}");
return bld.toString();
}
}
private VolumeScanner scanner;
@ -681,4 +695,121 @@ public void testCalculateNeededBytesPerSec() throws Exception {
Assert.assertFalse(VolumeScanner.
calculateShouldScan("test", 100000L, 365000000L, 0, 60));
}
/**
* Test that we can mark certain blocks as suspect, and get them quickly
* rescanned that way. See HDFS-7686 and HDFS-7548.
*/
@Test(timeout=120000)
public void testMarkSuspectBlock() throws Exception {
Configuration conf = new Configuration();
// Set a really long scan period.
conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
TestScanResultHandler.class.getName());
conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L);
final TestContext ctx = new TestContext(conf, 1);
final int NUM_EXPECTED_BLOCKS = 10;
ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 1);
final TestScanResultHandler.Info info =
TestScanResultHandler.getInfo(ctx.volumes.get(0));
String storageID = ctx.datanode.getFSDataset().
getVolumes().get(0).getStorageID();
synchronized (info) {
info.sem = new Semaphore(4);
info.shouldRun = true;
info.notify();
}
// Scan the first 4 blocks
LOG.info("Waiting for the first 4 blocks to be scanned.");
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
synchronized (info) {
if (info.blocksScanned >= 4) {
LOG.info("info = {}. blockScanned has now reached 4.", info);
return true;
} else {
LOG.info("info = {}. Waiting for blockScanned to reach 4.", info);
return false;
}
}
}
}, 50, 30000);
// We should have scanned 4 blocks
synchronized (info) {
assertEquals("Expected 4 good blocks.", 4, info.goodBlocks.size());
info.goodBlocks.clear();
assertEquals("Expected 4 blocksScanned", 4, info.blocksScanned);
assertEquals("Did not expect bad blocks.", 0, info.badBlocks.size());
info.blocksScanned = 0;
}
ExtendedBlock first = ctx.getFileBlock(0, 0);
ctx.datanode.getBlockScanner().markSuspectBlock(storageID, first);
// When we increment the semaphore, the TestScanResultHandler will finish
// adding the block that it was scanning previously (the 5th block).
// We increment the semaphore twice so that the handler will also
// get a chance to see the suspect block which we just requested the
// VolumeScanner to process.
info.sem.release(2);
LOG.info("Waiting for 2 more blocks to be scanned.");
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
synchronized (info) {
if (info.blocksScanned >= 2) {
LOG.info("info = {}. blockScanned has now reached 2.", info);
return true;
} else {
LOG.info("info = {}. Waiting for blockScanned to reach 2.", info);
return false;
}
}
}
}, 50, 30000);
synchronized (info) {
assertTrue("Expected block " + first + " to have been scanned.",
info.goodBlocks.contains(first));
assertEquals(2, info.goodBlocks.size());
info.goodBlocks.clear();
assertEquals("Did not expect bad blocks.", 0, info.badBlocks.size());
assertEquals(2, info.blocksScanned);
info.blocksScanned = 0;
}
// Re-mark the same block as suspect.
ctx.datanode.getBlockScanner().markSuspectBlock(storageID, first);
info.sem.release(10);
LOG.info("Waiting for 5 more blocks to be scanned.");
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
synchronized (info) {
if (info.blocksScanned >= 5) {
LOG.info("info = {}. blockScanned has now reached 5.", info);
return true;
} else {
LOG.info("info = {}. Waiting for blockScanned to reach 5.", info);
return false;
}
}
}
}, 50, 30000);
synchronized (info) {
assertEquals(5, info.goodBlocks.size());
assertEquals(0, info.badBlocks.size());
assertEquals(5, info.blocksScanned);
// We should not have rescanned the "suspect block",
// because it was recently rescanned by the suspect block system.
// This is a test of the "suspect block" rate limiting.
Assert.assertFalse("We should not " +
"have rescanned block " + first + ", because it should have been " +
"in recentSuspectBlocks.", info.goodBlocks.contains(first));
info.blocksScanned = 0;
}
}
}