HDFS-6783. Fix HDFS CacheReplicationMonitor rescan logic. Contributed by Yi Liu and Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1617872 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Uma Maheswara Rao G 2014-08-14 04:20:00 +00:00
parent e90d940699
commit 44864c68b5
2 changed files with 35 additions and 22 deletions

View File

@ -506,6 +506,8 @@ Release 2.6.0 - UNRELEASED
HDFS-6247. Avoid timeouts for replaceBlock() call by sending intermediate HDFS-6247. Avoid timeouts for replaceBlock() call by sending intermediate
responses to Balancer (vinayakumarb) responses to Balancer (vinayakumarb)
HDFS-6783. Fix HDFS CacheReplicationMonitor rescan logic. (Yi Liu via umamahesh)
Release 2.5.0 - UNRELEASED Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -103,22 +103,22 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
*/ */
private final Condition scanFinished; private final Condition scanFinished;
/**
* Whether there are pending CacheManager operations that necessitate a
* CacheReplicationMonitor rescan. Protected by the CRM lock.
*/
private boolean needsRescan = true;
/**
* Whether we are currently doing a rescan. Protected by the CRM lock.
*/
private boolean isScanning = false;
/** /**
* The number of rescans completed. Used to wait for scans to finish. * The number of rescans completed. Used to wait for scans to finish.
* Protected by the CacheReplicationMonitor lock. * Protected by the CacheReplicationMonitor lock.
*/ */
private long scanCount = 0; private long completedScanCount = 0;
/**
* The scan we're currently performing, or -1 if no scan is in progress.
* Protected by the CacheReplicationMonitor lock.
*/
private long curScanCount = -1;
/**
* The number of rescans we need to complete. Protected by the CRM lock.
*/
private long neededScanCount = 0;
/** /**
* True if this monitor should terminate. Protected by the CRM lock. * True if this monitor should terminate. Protected by the CRM lock.
@ -169,7 +169,7 @@ public void run() {
LOG.info("Shutting down CacheReplicationMonitor"); LOG.info("Shutting down CacheReplicationMonitor");
return; return;
} }
if (needsRescan) { if (completedScanCount < neededScanCount) {
LOG.info("Rescanning because of pending operations"); LOG.info("Rescanning because of pending operations");
break; break;
} }
@ -182,8 +182,6 @@ public void run() {
doRescan.await(delta, TimeUnit.MILLISECONDS); doRescan.await(delta, TimeUnit.MILLISECONDS);
curTimeMs = Time.monotonicNow(); curTimeMs = Time.monotonicNow();
} }
isScanning = true;
needsRescan = false;
} finally { } finally {
lock.unlock(); lock.unlock();
} }
@ -194,8 +192,8 @@ public void run() {
// Update synchronization-related variables. // Update synchronization-related variables.
lock.lock(); lock.lock();
try { try {
isScanning = false; completedScanCount = curScanCount;
scanCount++; curScanCount = -1;
scanFinished.signalAll(); scanFinished.signalAll();
} finally { } finally {
lock.unlock(); lock.unlock();
@ -226,16 +224,15 @@ public void waitForRescanIfNeeded() {
"Must not hold the FSN write lock when waiting for a rescan."); "Must not hold the FSN write lock when waiting for a rescan.");
Preconditions.checkArgument(lock.isHeldByCurrentThread(), Preconditions.checkArgument(lock.isHeldByCurrentThread(),
"Must hold the CRM lock when waiting for a rescan."); "Must hold the CRM lock when waiting for a rescan.");
if (!needsRescan) { if (neededScanCount <= completedScanCount) {
return; return;
} }
// If no scan is already ongoing, mark the CRM as dirty and kick // If no scan is already ongoing, mark the CRM as dirty and kick
if (!isScanning) { if (curScanCount < 0) {
doRescan.signal(); doRescan.signal();
} }
// Wait until the scan finishes and the count advances // Wait until the scan finishes and the count advances
final long startCount = scanCount; while ((!shutdown) && (completedScanCount < neededScanCount)) {
while ((!shutdown) && (startCount >= scanCount)) {
try { try {
scanFinished.await(); scanFinished.await();
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -253,7 +250,14 @@ public void waitForRescanIfNeeded() {
public void setNeedsRescan() { public void setNeedsRescan() {
Preconditions.checkArgument(lock.isHeldByCurrentThread(), Preconditions.checkArgument(lock.isHeldByCurrentThread(),
"Must hold the CRM lock when setting the needsRescan bit."); "Must hold the CRM lock when setting the needsRescan bit.");
this.needsRescan = true; if (curScanCount >= 0) {
// If there is a scan in progress, we need to wait for the scan after
// that.
neededScanCount = curScanCount + 1;
} else {
// If there is no scan in progress, we need to wait for the next scan.
neededScanCount = completedScanCount + 1;
}
} }
/** /**
@ -284,10 +288,17 @@ private void rescan() throws InterruptedException {
scannedBlocks = 0; scannedBlocks = 0;
namesystem.writeLock(); namesystem.writeLock();
try { try {
lock.lock();
if (shutdown) { if (shutdown) {
throw new InterruptedException("CacheReplicationMonitor was " + throw new InterruptedException("CacheReplicationMonitor was " +
"shut down."); "shut down.");
} }
curScanCount = completedScanCount + 1;
}
finally {
lock.unlock();
}
try {
resetStatistics(); resetStatistics();
rescanCacheDirectives(); rescanCacheDirectives();
rescanCachedBlockMap(); rescanCachedBlockMap();