diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 3286ffb4f0..1729106ad1 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -271,6 +271,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.namenode.redundancy.considerLoad.factor"; public static final double DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR_DEFAULT = 2.0; + public static final String DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYVOLUME_KEY = + "dfs.namenode.redundancy.considerLoadByVolume"; + public static final boolean + DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYVOLUME_DEFAULT + = false; public static final String DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY = HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY; public static final int DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT = 3; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 1fef3db69d..3d5ecf9b57 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -82,6 +82,7 @@ private enum NodeNotChosenReason { NOT_IN_SERVICE("the node is not in service"), NODE_STALE("the node is stale"), NODE_TOO_BUSY("the node is too busy"), + NODE_TOO_BUSY_BY_VOLUME("the node is too busy based on volume load"), TOO_MANY_NODES_ON_RACK("the rack has too many chosen nodes"), NOT_ENOUGH_STORAGE_SPACE("not enough storage space to place the block"), NO_REQUIRED_STORAGE_TYPE("required storage types are unavailable"), @@ -101,6 +102,7 @@ private String getText() { protected boolean considerLoad; private boolean considerLoadByStorageType; protected double considerLoadFactor; + private boolean considerLoadByVolume = false; private boolean preferLocalNode; private boolean dataNodePeerStatsEnabled; private volatile boolean excludeSlowNodesEnabled; @@ -131,6 +133,10 @@ public void initialize(Configuration conf, FSClusterStats stats, this.considerLoadFactor = conf.getDouble( DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR, DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR_DEFAULT); + this.considerLoadByVolume = conf.getBoolean( + DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYVOLUME_KEY, + DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYVOLUME_DEFAULT + ); this.stats = stats; this.clusterMap = clusterMap; this.host2datanodeMap = host2datanodeMap; @@ -1007,6 +1013,16 @@ boolean excludeNodeByLoad(DatanodeDescriptor node){ "(load: " + nodeLoad + " > " + maxLoad + ")"); return true; } + if (considerLoadByVolume) { + final int numVolumesAvailable = node.getNumVolumesAvailable(); + final double maxLoadForVolumes = considerLoadFactor * numVolumesAvailable * + stats.getInServiceXceiverAverageForVolume(); + if (maxLoadForVolumes > 0.0 && nodeLoad > maxLoadForVolumes) { + logNodeIsNotChosen(node, NodeNotChosenReason.NODE_TOO_BUSY_BY_VOLUME, + "(load: " + nodeLoad + " > " + maxLoadForVolumes + ") "); + return true; + } + } return false; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index c77d54591a..352238b7f7 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -233,6 +233,9 @@ public Type getType() { // HB processing can use it to tell if it is the first HB since DN restarted private boolean heartbeatedSinceRegistration = false; + /** The number of volumes that can be written.*/ + private int numVolumesAvailable = 0; + /** * DatanodeDescriptor constructor * @param nodeID id of the data node @@ -411,6 +414,7 @@ private void updateStorageStats(StorageReport[] reports, long cacheCapacity, long totalNonDfsUsed = 0; Set visitedMount = new HashSet<>(); Set failedStorageInfos = null; + int volumesAvailable = 0; // Decide if we should check for any missing StorageReport and mark it as // failed. There are different scenarios. @@ -489,7 +493,11 @@ private void updateStorageStats(StorageReport[] reports, long cacheCapacity, visitedMount.add(mount); } } + if (report.getRemaining() > 0 && storage.getState() != State.FAILED) { + volumesAvailable += 1; + } } + this.numVolumesAvailable = volumesAvailable; // Update total metrics for the node. setCapacity(totalCapacity); @@ -981,6 +989,14 @@ public VolumeFailureSummary getVolumeFailureSummary() { return volumeFailureSummary; } + /** + * Return the number of volumes that can be written. + * @return the number of volumes that can be written. + */ + public int getNumVolumesAvailable() { + return numVolumesAvailable; + } + /** * @param nodeReg DatanodeID to update registration for. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 88f3ac4e7c..ed60f388d3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -2101,6 +2101,17 @@ public double getInServiceXceiverAverage() { return avgLoad; } + @Override + public double getInServiceXceiverAverageForVolume() { + double avgLoad = 0; + final int volumes = heartbeatManager.getInServiceAvailableVolumeCount(); + if (volumes > 0) { + final long xceivers = heartbeatManager.getInServiceXceiverCount(); + avgLoad = (double)xceivers/volumes; + } + return avgLoad; + } + @Override public Map getStorageTypeStats() { return heartbeatManager.getStorageTypeStats(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java index 36a9c2bc09..fcf86195bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java @@ -60,7 +60,9 @@ public interface DatanodeStatistics { /** @return number of non-decommission(ing|ed) nodes */ public int getNumDatanodesInService(); - + + /** @return average xceiver count for writable volumes. */ + int getInServiceAvailableVolumeCount(); /** * @return the total used space by data nodes for non-DFS purposes * such as storing temporary files on the local file system diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStats.java index 912d4d236a..5bd88b561a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStats.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStats.java @@ -44,6 +44,7 @@ class DatanodeStats { private int nodesInService = 0; private int nodesInServiceXceiverCount = 0; + private int nodesInServiceAvailableVolumeCount = 0; private int expiredHeartbeats = 0; synchronized void add(final DatanodeDescriptor node) { @@ -58,6 +59,7 @@ synchronized void add(final DatanodeDescriptor node) { capacityRemaining += node.getRemaining(); cacheCapacity += node.getCacheCapacity(); cacheUsed += node.getCacheUsed(); + nodesInServiceAvailableVolumeCount += node.getNumVolumesAvailable(); } else if (node.isDecommissionInProgress() || node.isEnteringMaintenance()) { cacheCapacity += node.getCacheCapacity(); @@ -87,6 +89,7 @@ synchronized void subtract(final DatanodeDescriptor node) { capacityRemaining -= node.getRemaining(); cacheCapacity -= node.getCacheCapacity(); cacheUsed -= node.getCacheUsed(); + nodesInServiceAvailableVolumeCount -= node.getNumVolumesAvailable(); } else if (node.isDecommissionInProgress() || node.isEnteringMaintenance()) { cacheCapacity -= node.getCacheCapacity(); @@ -149,6 +152,10 @@ synchronized int getNodesInServiceXceiverCount() { return nodesInServiceXceiverCount; } + synchronized int getNodesInServiceAvailableVolumeCount() { + return nodesInServiceAvailableVolumeCount; + } + synchronized int getExpiredHeartbeats() { return expiredHeartbeats; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java index 14122952bb..217dd36e3e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java @@ -53,14 +53,24 @@ public interface FSClusterStats { public int getNumDatanodesInService(); /** - * an indication of the average load of non-decommission(ing|ed) nodes - * eligible for block placement + * An indication of the average load of non-decommission(ing|ed) nodes + * eligible for block placement. * * @return average of the in service number of block transfers and block * writes that are currently occurring on the cluster. */ public double getInServiceXceiverAverage(); + /** + * An indication of the average load of volumes at non-decommission(ing|ed) + * nodes eligible for block placement. + * + * @return average of in service number of block transfers and block + * writes that are currently occurring on the volumes of the + * cluster. + */ + double getInServiceXceiverAverageForVolume(); + /** * Indicates the storage statistics per storage type. * @return storage statistics per storage type. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java index 01e1b6392a..429d40d9fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java @@ -183,6 +183,11 @@ public int getInServiceXceiverCount() { public int getNumDatanodesInService() { return stats.getNodesInService(); } + + @Override + public int getInServiceAvailableVolumeCount() { + return stats.getNodesInServiceAvailableVolumeCount(); + } @Override public long getCacheCapacity() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index bdd048004d..8e6ef99040 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -334,6 +334,14 @@ + + dfs.namenode.redundancy.considerLoadByVolume + false + Decide if chooseTarget considers the target's volume load or + not. + + + dfs.namenode.read.considerLoad false diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java index 1e75452d3d..c9eb624e5c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java @@ -56,13 +56,13 @@ abstract public class BaseReplicationPolicyTest { protected String blockPlacementPolicy; protected NamenodeProtocols nameNodeRpc = null; - static void updateHeartbeatWithUsage(DatanodeDescriptor dn, + void updateHeartbeatWithUsage(DatanodeDescriptor dn, long capacity, long dfsUsed, long remaining, long blockPoolUsed, long dnCacheCapacity, long dnCacheUsed, int xceiverCount, int volFailures) { dn.getStorageInfos()[0].setUtilizationForTesting( capacity, dfsUsed, remaining, blockPoolUsed); - dn.updateHeartbeat( + dnManager.getHeartbeatManager().updateHeartbeat(dn, BlockManagerTestUtil.getStorageReportsForDatanode(dn), dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyRatioConsiderLoadWithStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyRatioConsiderLoadWithStorage.java new file mode 100644 index 0000000000..d06af05469 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyRatioConsiderLoadWithStorage.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.TestBlockStoragePolicy; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Verify that chooseTarget can exclude nodes with high volume average load. + */ +public class TestReplicationPolicyRatioConsiderLoadWithStorage + extends BaseReplicationPolicyTest { + + public TestReplicationPolicyRatioConsiderLoadWithStorage() { + this.blockPlacementPolicy = BlockPlacementPolicyDefault.class.getName(); + } + + @Override + DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) { + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY, + true); + conf.setDouble(DFSConfigKeys + .DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR, 2); + conf.setBoolean( + DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYVOLUME_KEY, true); + + final String[] racks = { + "/rack1", + "/rack2", + "/rack3", + "/rack4", + "/rack5"}; + storages = DFSTestUtil.createDatanodeStorageInfos(racks); + DatanodeDescriptor[] descriptors = + DFSTestUtil.toDatanodeDescriptor(storages); + long storageCapacity = + 2 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE; + // Each datanode has 6 storages, but the number of available storages + // varies. + for (int i = 0; i < descriptors.length; i++) { + for (int j = 0; j < 5; j++) { + DatanodeStorage s = + new DatanodeStorage("s" + i + j); + descriptors[i].updateStorage(s); + + } + for (int j = 0; j < descriptors[i].getStorageInfos().length; j++) { + DatanodeStorageInfo dsInfo = descriptors[i].getStorageInfos()[j]; + if (j > i + 1) { + dsInfo.setUtilizationForTesting(storageCapacity, storageCapacity, 0, + storageCapacity); + } else { + dsInfo.setUtilizationForTesting(storageCapacity, 0, storageCapacity, + 0); + } + } + } + return descriptors; + } + + /** + * Tests that chooseTarget with considerLoad and consider volume load set to + * true and correctly calculates load. + */ + @Test + public void testChooseTargetWithRatioConsiderLoad() { + namenode.getNamesystem().writeLock(); + try { + // After heartbeat has been processed, the total load should be 200. + // And average load per node should be 40. The max load should be 2 * 40; + // And average load per storage should be 10. Considering available + // storages, the max load should be: + // 2*10*2, 3*10*2, 4*10*2, 5*10*2, 6*10*2. + // Considering the load of every node and number of storages: + // Index: 0, 1, 2, 3, 4 + // Available Storage: 2, 3, 4, 5, 6 + // Load: 50, 110, 28, 2, 10 + // So, dataNodes[1] should be never chosen because over-load of node. + // And dataNodes[0] should be never chosen because over-load of per + // storage. + dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[0], + BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[0]), + dataNodes[0].getCacheCapacity(), + dataNodes[0].getCacheUsed(), + 50, 0, null); + dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[1], + BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[1]), + dataNodes[0].getCacheCapacity(), + dataNodes[0].getCacheUsed(), + 110, 0, null); + dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[2], + BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[2]), + dataNodes[0].getCacheCapacity(), + dataNodes[0].getCacheUsed(), + 28, 0, null); + dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[3], + BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[3]), + dataNodes[0].getCacheCapacity(), + dataNodes[0].getCacheUsed(), + 2, 0, null); + dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[4], + BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[4]), + dataNodes[0].getCacheCapacity(), + dataNodes[0].getCacheUsed(), + 10, 0, null); + + Set targetSet = new HashSet<>(); + + // Try to choose 3 datanode targets. + DatanodeDescriptor writerDn = dataNodes[2]; + DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager() + .getBlockPlacementPolicy() + .chooseTarget("testFile.txt", 3, writerDn, new ArrayList<>(), false, + null, 1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null); + // The result contains 3 nodes(dataNodes[2],dataNodes[3],dataNodes[4]). + assertEquals(3, targets.length); + for (DatanodeStorageInfo dsi : targets) { + targetSet.add(dsi.getDatanodeDescriptor()); + } + assertTrue(targetSet.contains(dataNodes[2])); + assertTrue(targetSet.contains(dataNodes[3])); + assertTrue(targetSet.contains(dataNodes[4])); + + // Try to choose 4 datanode targets. + targets = namenode.getNamesystem().getBlockManager() + .getBlockPlacementPolicy() + .chooseTarget("testFile.txt", 4, writerDn, new ArrayList<>(), false, + null, 1024, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null); + // The result contains 3 nodes(dataNodes[2],dataNodes[3],dataNodes[4]). + assertEquals(3, targets.length); + targetSet.clear(); + for (DatanodeStorageInfo dsi : targets) { + targetSet.add(dsi.getDatanodeDescriptor()); + } + assertTrue(targetSet.contains(dataNodes[2])); + assertTrue(targetSet.contains(dataNodes[3])); + assertTrue(targetSet.contains(dataNodes[4])); + } finally { + namenode.getNamesystem().writeUnlock(); + } + } +} \ No newline at end of file