diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 378d1a130a..bb652ce686 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -725,6 +725,9 @@ Branch-2 ( Unreleased changes ) (Vinay via umamahesh) HDFS-1490. TransferFSImage should timeout (Dmytro Molkov and Vinay via todd) + + HDFS-3828. Block Scanner rescans blocks too frequently. + (Andy Isaacson via eli) BREAKDOWN OF HDFS-3042 SUBTASKS diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java index 55e8ec63b0..757c0d79c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java @@ -51,6 +51,8 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.Time; +import com.google.common.annotations.VisibleForTesting; + /** * Scans the block files under a block pool and verifies that the * files are not corrupt. @@ -255,6 +257,11 @@ synchronized void deleteBlock(Block block) { } } + @VisibleForTesting + long getTotalScans() { + return totalScans; + } + /** @return the last scan time for the block pool. */ long getLastScanTime() { return lastScanTime.get(); @@ -563,7 +570,24 @@ private synchronized void startNewPeriod() { currentPeriodStart = Time.now(); } + private synchronized boolean workRemainingInCurrentPeriod() { + if (bytesLeft <= 0 && Time.now() < currentPeriodStart + scanPeriod) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping scan since bytesLeft=" + bytesLeft + ", Start=" + + currentPeriodStart + ", period=" + scanPeriod + ", now=" + + Time.now() + " " + blockPoolId); + } + return false; + } else { + return true; + } + } + void scanBlockPoolSlice() { + if (!workRemainingInCurrentPeriod()) { + return; + } + // Create a new processedBlocks structure processedBlocks = new HashMap(); if (!assignInitialVerificationTimes()) { @@ -608,14 +632,14 @@ private void scan() { LOG.warn("RuntimeException during BlockPoolScanner.scan()", e); throw e; } finally { - cleanUp(); + rollVerificationLogs(); if (LOG.isDebugEnabled()) { LOG.debug("Done scanning block pool: " + blockPoolId); } } } - private synchronized void cleanUp() { + private synchronized void rollVerificationLogs() { if (verificationLog != null) { try { verificationLog.logs.roll(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java index e189c81b88..4a2b76296b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java @@ -34,6 +34,8 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import com.google.common.annotations.VisibleForTesting; + /** * DataBlockScanner manages block scanning for all the block pools. For each * block pool a {@link BlockPoolSliceScanner} is created which runs in a separate @@ -47,6 +49,8 @@ public class DataBlockScanner implements Runnable { private final FsDatasetSpi dataset; private final Configuration conf; + static final int SLEEP_PERIOD_MS = 5 * 1000; + /** * Map to find the BlockPoolScanner for a given block pool id. This is updated * when a BPOfferService becomes alive or dies. @@ -68,10 +72,10 @@ public void run() { String currentBpId = ""; boolean firstRun = true; while (datanode.shouldRun && !Thread.interrupted()) { - //Sleep everytime except in the first interation. + //Sleep everytime except in the first iteration. if (!firstRun) { try { - Thread.sleep(5000); + Thread.sleep(SLEEP_PERIOD_MS); } catch (InterruptedException ex) { // Interrupt itself again to set the interrupt status blockScannerThread.interrupt(); @@ -103,7 +107,7 @@ private void waitForInit() { while ((getBlockPoolSetSize() < datanode.getAllBpOs().length) || (getBlockPoolSetSize() < 1)) { try { - Thread.sleep(5000); + Thread.sleep(SLEEP_PERIOD_MS); } catch (InterruptedException e) { blockScannerThread.interrupt(); return; @@ -249,7 +253,7 @@ public synchronized void removeBlockPool(String blockPoolId) { LOG.info("Removed bpid="+blockPoolId+" from blockPoolScannerMap"); } - // This method is used for testing + @VisibleForTesting long getBlocksScannedInLastRun(String bpid) throws IOException { BlockPoolSliceScanner bpScanner = getBPScanner(bpid); if (bpScanner == null) { @@ -259,6 +263,16 @@ long getBlocksScannedInLastRun(String bpid) throws IOException { } } + @VisibleForTesting + long getTotalScans(String bpid) throws IOException { + BlockPoolSliceScanner bpScanner = getBPScanner(bpid); + if (bpScanner == null) { + throw new IOException("Block Pool: "+bpid+" is not running"); + } else { + return bpScanner.getTotalScans(); + } + } + public void start() { blockScannerThread = new Thread(this); blockScannerThread.setDaemon(true); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java index 9b52252f36..67964e1841 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java @@ -20,8 +20,11 @@ import java.io.IOException; +import junit.framework.Assert; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -31,7 +34,13 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner; +import static org.apache.hadoop.hdfs.server.datanode.DataBlockScanner.SLEEP_PERIOD_MS; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; import org.junit.Test; +import org.junit.Ignore; +import static org.junit.Assert.fail; public class TestMultipleNNDataBlockScanner { @@ -166,4 +175,75 @@ public void testBlockScannerAfterRestart() throws IOException, cluster.shutdown(); } } + + @Test + public void test2NNBlockRescanInterval() throws IOException { + ((Log4JLogger)BlockPoolSliceScanner.LOG).getLogger().setLevel(Level.ALL); + Configuration conf = new HdfsConfiguration(); + cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(3)) + .build(); + + try { + FileSystem fs = cluster.getFileSystem(1); + Path file2 = new Path("/test/testBlockScanInterval"); + DFSTestUtil.createFile(fs, file2, 30, (short) 1, 0); + + fs = cluster.getFileSystem(0); + Path file1 = new Path("/test/testBlockScanInterval"); + DFSTestUtil.createFile(fs, file1, 30, (short) 1, 0); + for (int i = 0; i < 8; i++) { + LOG.info("Verifying that the blockscanner scans exactly once"); + waitAndScanBlocks(1, 1); + } + } finally { + cluster.shutdown(); + } + } + + /** + * HDFS-3828: DN rescans blocks too frequently + * + * @throws Exception + */ + @Test + public void testBlockRescanInterval() throws IOException { + ((Log4JLogger)BlockPoolSliceScanner.LOG).getLogger().setLevel(Level.ALL); + Configuration conf = new HdfsConfiguration(); + cluster = new MiniDFSCluster.Builder(conf).build(); + + try { + FileSystem fs = cluster.getFileSystem(); + Path file1 = new Path("/test/testBlockScanInterval"); + DFSTestUtil.createFile(fs, file1, 30, (short) 1, 0); + for (int i = 0; i < 4; i++) { + LOG.info("Verifying that the blockscanner scans exactly once"); + waitAndScanBlocks(1, 1); + } + } finally { + cluster.shutdown(); + } + } + + void waitAndScanBlocks(long scansLastRun, long scansTotal) + throws IOException { + // DataBlockScanner will run for every 5 seconds so we are checking for + // every 5 seconds + int n = 5; + String bpid = cluster.getNamesystem(0).getBlockPoolId(); + DataNode dn = cluster.getDataNodes().get(0); + long blocksScanned, total; + do { + try { + Thread.sleep(SLEEP_PERIOD_MS); + } catch (InterruptedException e) { + fail("Interrupted: " + e); + } + blocksScanned = dn.blockScanner.getBlocksScannedInLastRun(bpid); + total = dn.blockScanner.getTotalScans(bpid); + LOG.info("bpid = " + bpid + " blocksScanned = " + blocksScanned + " total=" + total); + } while (n-- > 0 && (blocksScanned != scansLastRun || scansTotal != total)); + Assert.assertEquals(scansTotal, total); + Assert.assertEquals(scansLastRun, blocksScanned); + } }