From 032ccba67c44aa83bd80b0209b4da2204f2f4c5e Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Mon, 18 May 2020 08:40:38 -0700 Subject: [PATCH] HDFS-15207. VolumeScanner skip to scan blocks accessed during recent scan peroid. Contributed by Yang Yun. (cherry picked from commit 50caba1a92cb36ce78307d47ed7624ce216562fc) --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 ++ .../hdfs/server/datanode/BlockScanner.java | 6 +++ .../hdfs/server/datanode/VolumeScanner.java | 22 +++++++++++ .../src/main/resources/hdfs-default.xml | 10 +++++ .../server/datanode/TestBlockScanner.java | 38 +++++++++++++++++++ 5 files changed, 80 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 2c943b6037..2e2b7db1d0 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -846,6 +846,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 21 * 24; // 3 weeks. public static final String DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND = "dfs.block.scanner.volume.bytes.per.second"; public static final long DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT = 1048576L; + public static final String DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED = + "dfs.block.scanner.skip.recent.accessed"; + public static final boolean DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED_DEFAULT = + false; public static final String DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed"; public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true; public static final String DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java index 6b1b96fb02..82efcf8643 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.datanode; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; @@ -112,6 +114,7 @@ static class Conf { final long maxStalenessMs; final long scanPeriodMs; final long cursorSaveMs; + final boolean skipRecentAccessed; final Class resultHandler; private static long getUnitTestLong(Configuration conf, String key, @@ -163,6 +166,9 @@ private static long getConfiguredScanPeriodMs(Configuration conf) { this.cursorSaveMs = Math.max(0L, getUnitTestLong(conf, INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS_DEFAULT)); + this.skipRecentAccessed = conf.getBoolean( + DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED, + DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED_DEFAULT); if (allowUnitTestSettings) { this.resultHandler = (Class) conf.getClass(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java index 84cfb04801..5f1a1e0248 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java @@ -19,8 +19,11 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.DataOutputStream; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedHashSet; @@ -32,6 +35,7 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; @@ -540,6 +544,24 @@ private long runLoop(ExtendedBlock suspectBlock) { this, curBlockIter.getBlockPoolId()); saveBlockIterator(curBlockIter); return 0; + } else if (conf.skipRecentAccessed) { + // Check the access time of block file to avoid scanning recently + // changed blocks, reducing disk IO. + try { + BlockLocalPathInfo blockLocalPathInfo = + volume.getDataset().getBlockLocalPathInfo(block); + BasicFileAttributes attr = Files.readAttributes( + new File(blockLocalPathInfo.getBlockPath()).toPath(), + BasicFileAttributes.class); + if (System.currentTimeMillis() - attr.lastAccessTime(). + to(TimeUnit.MILLISECONDS) < conf.scanPeriodMs) { + return 0; + } + + } catch (IOException ioe) { + LOG.debug("Failed to get access time of block {}", + block, ioe); + } } } if (curBlockIter != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index ad4d04394a..7c42e0d879 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1586,6 +1586,16 @@ + + dfs.block.scanner.skip.recent.accessed + false + + If this is true, scanner will check the access time of block file to avoid + scanning blocks accessed during recent scan peroid, reducing disk IO. + This feature will not work if the DataNode volume has noatime mount option. + + + dfs.datanode.readahead.bytes 4194304 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java index a7d325e7eb..d9727bb737 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.datanode; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND; import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS; @@ -25,6 +26,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; import java.io.Closeable; import java.io.File; @@ -974,4 +976,40 @@ public Boolean get() { info.blocksScanned = 0; } } + + @Test + public void testSkipRecentAccessFile() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED, true); + conf.setLong(INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS, 2000L); + conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER, + TestScanResultHandler.class.getName()); + final TestContext ctx = new TestContext(conf, 1); + final int totalBlocks = 5; + ctx.createFiles(0, totalBlocks, 4096); + + final TestScanResultHandler.Info info = + TestScanResultHandler.getInfo(ctx.volumes.get(0)); + synchronized (info) { + info.shouldRun = true; + info.notify(); + } + try { + GenericTestUtils.waitFor(() -> { + synchronized (info) { + return info.blocksScanned > 0; + } + }, 10, 500); + fail("Scan nothing for all files are accessed in last period."); + } catch (TimeoutException e) { + LOG.debug("Timeout for all files are accessed in last period."); + } + synchronized (info) { + info.shouldRun = false; + info.notify(); + } + assertEquals("Should not scan block accessed in last period", + 0, info.blocksScanned); + ctx.close(); + } }