diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 8b7b6cf09d..14f9cd7730 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -701,6 +701,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.datanode.max.disks.to.report"; public static final int DFS_DATANODE_MAX_DISKS_TO_REPORT_DEFAULT = 5; + public static final String DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY = + "dfs.datanode.max.slowdisks.to.exclude"; + public static final int DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_DEFAULT = + 0; public static final String DFS_DATANODE_HOST_NAME_KEY = HdfsClientConfigKeys.DeprecatedKeys.DFS_DATANODE_HOST_NAME_KEY; public static final String DFS_NAMENODE_CHECKPOINT_DIR_KEY = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 8d37bda166..776e28594f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -374,7 +374,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) RoundRobinVolumeChoosingPolicy.class, VolumeChoosingPolicy.class), conf); volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(), - blockChooserImpl, conf); + blockChooserImpl, conf, datanode.getDiskMetrics()); asyncDiskService = new FsDatasetAsyncDiskService(datanode, this); asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode, conf); deletingBlock = new HashMap>(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index 9400c7c7f4..95470bb8ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -33,6 +33,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; @@ -43,6 +44,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.BlockScanner; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeDiskMetrics; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.AutoCloseableLock; @@ -67,15 +69,17 @@ class FsVolumeList { private final boolean enableSameDiskTiering; private final MountVolumeMap mountVolumeMap; private Map capacityRatioMap; + private final DataNodeDiskMetrics diskMetrics; FsVolumeList(List initialVolumeFailureInfos, BlockScanner blockScanner, VolumeChoosingPolicy blockChooser, - Configuration config) { + Configuration config, DataNodeDiskMetrics dataNodeDiskMetrics) { this.blockChooser = blockChooser; this.blockScanner = blockScanner; this.checkDirsLock = new AutoCloseableLock(); this.checkDirsLockCondition = checkDirsLock.newCondition(); + this.diskMetrics = dataNodeDiskMetrics; for (VolumeFailureInfo volumeFailureInfo: initialVolumeFailureInfos) { volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(), volumeFailureInfo); @@ -100,6 +104,15 @@ List getVolumes() { private FsVolumeReference chooseVolume(List list, long blockSize, String storageId) throws IOException { + + // Exclude slow disks when choosing volume. + if (diskMetrics != null) { + List slowDisksToExclude = diskMetrics.getSlowDisksToExclude(); + list = list.stream() + .filter(volume -> !slowDisksToExclude.contains(volume.getBaseURI().getPath())) + .collect(Collectors.toList()); + } + while (true) { FsVolumeImpl volume = blockChooser.chooseVolume(list, blockSize, storageId); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java index cae464bfae..9c2151c775 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java @@ -34,9 +34,13 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * This class detects and maintains DataNode disk outliers and their @@ -69,6 +73,14 @@ public class DataNodeDiskMetrics { * Threshold in milliseconds below which a disk is definitely not slow. */ private final long lowThresholdMs; + /** + * The number of slow disks that needs to be excluded. + */ + private int maxSlowDisksToExclude; + /** + * List of slow disks that need to be excluded. + */ + private List slowDisksToExclude = new ArrayList<>(); public DataNodeDiskMetrics(DataNode dn, long diskOutlierDetectionIntervalMs, Configuration conf) { @@ -80,6 +92,9 @@ public DataNodeDiskMetrics(DataNode dn, long diskOutlierDetectionIntervalMs, lowThresholdMs = conf.getLong(DFSConfigKeys.DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY, DFSConfigKeys.DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_DEFAULT); + maxSlowDisksToExclude = + conf.getInt(DFSConfigKeys.DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY, + DFSConfigKeys.DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_DEFAULT); slowDiskDetector = new OutlierDetector(minOutlierDetectionDisks, lowThresholdMs); shouldRun = true; @@ -127,6 +142,21 @@ public void run() { detectAndUpdateDiskOutliers(metadataOpStats, readIoStats, writeIoStats); + + // Sort the slow disks by latency and extract the top n by maxSlowDisksToExclude. + if (maxSlowDisksToExclude > 0) { + ArrayList diskLatencies = new ArrayList<>(); + for (Map.Entry> diskStats : + diskOutliersStats.entrySet()) { + diskLatencies.add(new DiskLatency(diskStats.getKey(), diskStats.getValue())); + } + + Collections.sort(diskLatencies, (o1, o2) + -> Double.compare(o2.getMaxLatency(), o1.getMaxLatency())); + + slowDisksToExclude = diskLatencies.stream().limit(maxSlowDisksToExclude) + .map(DiskLatency::getSlowDisk).collect(Collectors.toList()); + } } try { @@ -171,6 +201,35 @@ private void detectAndUpdateDiskOutliers(Map metadataOpStats, } } + /** + * This structure is a wrapper over disk latencies. + */ + public static class DiskLatency { + final private String slowDisk; + final private Map latencyMap; + + public DiskLatency( + String slowDiskID, + Map latencyMap) { + this.slowDisk = slowDiskID; + this.latencyMap = latencyMap; + } + + double getMaxLatency() { + double maxLatency = 0; + for (double latency : latencyMap.values()) { + if (latency > maxLatency) { + maxLatency = latency; + } + } + return maxLatency; + } + + public String getSlowDisk() { + return slowDisk; + } + } + private void addDiskStat(Map> diskStats, String disk, DiskOp diskOp, double latency) { if (!diskStats.containsKey(disk)) { @@ -206,4 +265,8 @@ public void addSlowDiskForTesting(String slowDiskPath, diskOutliersStats.put(slowDiskPath, latencies); } } + + public List getSlowDisksToExclude() { + return slowDisksToExclude; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 4702d9c3fe..e766a13787 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2483,6 +2483,15 @@ + + dfs.datanode.max.slowdisks.to.exclude + 0 + + The number of slow disks that needs to be excluded. By default, this parameter is set to 0, + which disables excluding slow disk when choosing volume. + + + hadoop.user.group.metrics.percentiles.intervals diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java index 15495dfd59..4d8e0c9998 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.DF; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; @@ -32,12 +33,16 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.datanode.BlockScanner; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hadoop.util.StringUtils; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -53,6 +58,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DU_RESERVED_PERCENTAGE_KEY; import static org.junit.Assert.assertEquals; @@ -73,6 +79,10 @@ public class TestFsVolumeList { private FsDatasetImpl dataset = null; private String baseDir; private BlockScanner blockScanner; + private final static int NUM_DATANODES = 3; + private final static int STORAGES_PER_DATANODE = 3; + private final static int DEFAULT_BLOCK_SIZE = 102400; + private final static int BUFFER_LENGTH = 1024; @Before public void setUp() { @@ -89,7 +99,7 @@ public void setUp() { public void testGetNextVolumeWithClosedVolume() throws IOException { FsVolumeList volumeList = new FsVolumeList( Collections.emptyList(), - blockScanner, blockChooser, conf); + blockScanner, blockChooser, conf, null); final List volumes = new ArrayList<>(); for (int i = 0; i < 3; i++) { File curDir = new File(baseDir, "nextvolume-" + i); @@ -132,7 +142,7 @@ public Boolean get() { @Test(timeout=30000) public void testReleaseVolumeRefIfNoBlockScanner() throws IOException { FsVolumeList volumeList = new FsVolumeList( - Collections.emptyList(), null, blockChooser, conf); + Collections.emptyList(), null, blockChooser, conf, null); File volDir = new File(baseDir, "volume-0"); volDir.mkdirs(); FsVolumeImpl volume = new FsVolumeImplBuilder() @@ -511,7 +521,7 @@ public void testGetVolumeWithSameDiskArchival() throws Exception { .build(); FsVolumeList volumeList = new FsVolumeList( Collections.emptyList(), - blockScanner, blockChooser, conf); + blockScanner, blockChooser, conf, null); volumeList.addVolume(archivalVolume.obtainReference()); volumeList.addVolume(diskVolume.obtainReference()); @@ -620,4 +630,99 @@ public void testDfsUsageStatWithSameDiskArchival() throws Exception { mountVolumeMap.removeVolume(spyArchivalVolume); assertEquals(dfCapacity - duReserved, spyDiskVolume.getCapacity()); } + + @Test + public void testExcludeSlowDiskWhenChoosingVolume() throws Exception { + conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); + // Set datanode outliers report interval to 1s. + conf.setStrings(DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY, "1s"); + // Enable datanode disk metrics collector. + conf.setInt(DFSConfigKeys.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY, 30); + // Enable excluding slow disks when choosing volume. + conf.setInt(DFSConfigKeys.DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY, 1); + // Ensure that each volume capacity is larger than the DEFAULT_BLOCK_SIZE. + long capacity = 10 * DEFAULT_BLOCK_SIZE; + long[][] capacities = new long[NUM_DATANODES][STORAGES_PER_DATANODE]; + String[] hostnames = new String[NUM_DATANODES]; + for (int i = 0; i < NUM_DATANODES; i++) { + hostnames[i] = i + "." + i + "." + i + "." + i; + for(int j = 0; j < STORAGES_PER_DATANODE; j++){ + capacities[i][j]=capacity; + } + } + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .hosts(hostnames) + .numDataNodes(NUM_DATANODES) + .storagesPerDatanode(STORAGES_PER_DATANODE) + .storageCapacities(capacities).build(); + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + + // Create file for each datanode. + ArrayList dataNodes = cluster.getDataNodes(); + DataNode dn0 = dataNodes.get(0); + DataNode dn1 = dataNodes.get(1); + DataNode dn2 = dataNodes.get(2); + + // Mock the first disk of each datanode is a slowest disk. + String slowDisk0OnDn0 = dn0.getFSDataset().getFsVolumeReferences().getReference(0) + .getVolume().getBaseURI().getPath(); + String slowDisk0OnDn1 = dn1.getFSDataset().getFsVolumeReferences().getReference(0) + .getVolume().getBaseURI().getPath(); + String slowDisk0OnDn2 = dn2.getFSDataset().getFsVolumeReferences().getReference(0) + .getVolume().getBaseURI().getPath(); + + String slowDisk1OnDn0 = dn0.getFSDataset().getFsVolumeReferences().getReference(1) + .getVolume().getBaseURI().getPath(); + String slowDisk1OnDn1 = dn1.getFSDataset().getFsVolumeReferences().getReference(1) + .getVolume().getBaseURI().getPath(); + String slowDisk1OnDn2 = dn2.getFSDataset().getFsVolumeReferences().getReference(1) + .getVolume().getBaseURI().getPath(); + + dn0.getDiskMetrics().addSlowDiskForTesting(slowDisk0OnDn0, ImmutableMap.of( + SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.5, + SlowDiskReports.DiskOp.METADATA, 2.0)); + dn1.getDiskMetrics().addSlowDiskForTesting(slowDisk0OnDn1, ImmutableMap.of( + SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.5, + SlowDiskReports.DiskOp.METADATA, 2.0)); + dn2.getDiskMetrics().addSlowDiskForTesting(slowDisk0OnDn2, ImmutableMap.of( + SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.5, + SlowDiskReports.DiskOp.METADATA, 2.0)); + + dn0.getDiskMetrics().addSlowDiskForTesting(slowDisk1OnDn0, ImmutableMap.of( + SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.0, + SlowDiskReports.DiskOp.METADATA, 1.0)); + dn1.getDiskMetrics().addSlowDiskForTesting(slowDisk1OnDn1, ImmutableMap.of( + SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.0, + SlowDiskReports.DiskOp.METADATA, 1.0)); + dn2.getDiskMetrics().addSlowDiskForTesting(slowDisk1OnDn2, ImmutableMap.of( + SlowDiskReports.DiskOp.READ, 1.0, SlowDiskReports.DiskOp.WRITE, 1.0, + SlowDiskReports.DiskOp.METADATA, 1.0)); + + // Wait until the data on the slow disk is collected successfully. + GenericTestUtils.waitFor(new Supplier() { + @Override public Boolean get() { + return dn0.getDiskMetrics().getSlowDisksToExclude().size() == 1 && + dn1.getDiskMetrics().getSlowDisksToExclude().size() == 1 && + dn2.getDiskMetrics().getSlowDisksToExclude().size() == 1; + } + }, 1000, 5000); + + // Create a file with 3 replica. + DFSTestUtil.createFile(fs, new Path("/file0"), false, BUFFER_LENGTH, 1000, + DEFAULT_BLOCK_SIZE, (short) 3, 0, false, null); + + // Asserts that the number of blocks created on a slow disk is 0. + Assert.assertEquals(0, dn0.getVolumeReport().stream() + .filter(v -> (v.getPath() + "/").equals(slowDisk0OnDn0)).collect(Collectors.toList()).get(0) + .getNumBlocks()); + Assert.assertEquals(0, dn1.getVolumeReport().stream() + .filter(v -> (v.getPath() + "/").equals(slowDisk0OnDn1)).collect(Collectors.toList()).get(0) + .getNumBlocks()); + Assert.assertEquals(0, dn2.getVolumeReport().stream() + .filter(v -> (v.getPath() + "/").equals(slowDisk0OnDn2)).collect(Collectors.toList()).get(0) + .getNumBlocks()); + } }