diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java index 6e0d88f2a7..137c940001 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java @@ -101,6 +101,13 @@ protected NetworkTopology init(InnerNode.Factory factory) { private int depthOfAllLeaves = -1; /** rack counter */ protected int numOfRacks = 0; + /** empty rack map, rackname->nodenumber. */ + private HashMap> rackMap = + new HashMap>(); + /** decommission nodes, contained stoped nodes. */ + private HashSet decommissionNodes = new HashSet<>(); + /** empty rack counter. */ + private int numOfEmptyRacks = 0; /** * Whether or not this cluster has ever consisted of more than 1 rack, @@ -150,6 +157,7 @@ public void add(Node node) { if (rack == null) { incrementRacks(); } + interAddNodeWithEmptyRack(node); if (depthOfAllLeaves == -1) { depthOfAllLeaves = node.getLevel(); } @@ -224,6 +232,7 @@ public void remove(Node node) { if (rack == null) { numOfRacks--; } + interRemoveNodeWithEmptyRack(node); } LOG.debug("NetworkTopology became:\n{}", this); } finally { @@ -1015,4 +1024,108 @@ protected static boolean isNodeInScope(Node node, String scope) { String nodeLocation = NodeBase.getPath(node) + NodeBase.PATH_SEPARATOR_STR; return nodeLocation.startsWith(scope); } -} \ No newline at end of file + + /** @return the number of nonempty racks */ + public int getNumOfNonEmptyRacks() { + return numOfRacks - numOfEmptyRacks; + } + + /** + * Update empty rack number when add a node like recommission. + * @param node node to be added; can be null + */ + public void recommissionNode(Node node) { + if (node == null) { + return; + } + if (node instanceof InnerNode) { + throw new IllegalArgumentException( + "Not allow to remove an inner node: " + NodeBase.getPath(node)); + } + netlock.writeLock().lock(); + try { + decommissionNodes.remove(node.getName()); + interAddNodeWithEmptyRack(node); + } finally { + netlock.writeLock().unlock(); + } + } + + /** + * Update empty rack number when remove a node like decommission. + * @param node node to be added; can be null + */ + public void decommissionNode(Node node) { + if (node == null) { + return; + } + if (node instanceof InnerNode) { + throw new IllegalArgumentException( + "Not allow to remove an inner node: " + NodeBase.getPath(node)); + } + netlock.writeLock().lock(); + try { + decommissionNodes.add(node.getName()); + interRemoveNodeWithEmptyRack(node); + } finally { + netlock.writeLock().unlock(); + } + } + + /** + * Internal function for update empty rack number + * for add or recommission a node. + * @param node node to be added; can be null + */ + private void interAddNodeWithEmptyRack(Node node) { + if (node == null) { + return; + } + String rackname = node.getNetworkLocation(); + Set nodes = rackMap.get(rackname); + if (nodes == null) { + nodes = new HashSet(); + } + if (!decommissionNodes.contains(node.getName())) { + nodes.add(node.getName()); + } + rackMap.put(rackname, nodes); + countEmptyRacks(); + } + + /** + * Internal function for update empty rack number + * for remove or decommission a node. + * @param node node to be removed; can be null + */ + private void interRemoveNodeWithEmptyRack(Node node) { + if (node == null) { + return; + } + String rackname = node.getNetworkLocation(); + Set nodes = rackMap.get(rackname); + if (nodes != null) { + InnerNode rack = (InnerNode) getNode(node.getNetworkLocation()); + if (rack == null) { + // this node and its rack are both removed. + rackMap.remove(rackname); + } else if (nodes.contains(node.getName())) { + // this node is decommissioned or removed. + nodes.remove(node.getName()); + rackMap.put(rackname, nodes); + } + countEmptyRacks(); + } + } + + private void countEmptyRacks() { + int count = 0; + for (Set nodes : rackMap.values()) { + if (nodes != null && nodes.isEmpty()) { + count++; + } + } + numOfEmptyRacks = count; + LOG.debug("Current numOfEmptyRacks is {}", numOfEmptyRacks); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index dec98d85b5..39a40f52ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -304,7 +304,7 @@ private DatanodeStorageInfo[] chooseTarget(int numOfReplicas, && stats.isAvoidingStaleDataNodesForWrite()); boolean avoidLocalRack = (addBlockFlags != null && addBlockFlags.contains(AddBlockFlag.NO_LOCAL_RACK) && writer != null - && clusterMap.getNumOfRacks() > 2); + && clusterMap.getNumOfNonEmptyRacks() > 2); boolean avoidLocalNode = (addBlockFlags != null && addBlockFlags.contains(AddBlockFlag.NO_LOCAL_WRITE) && writer != null @@ -385,7 +385,7 @@ protected int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) { totalNumOfReplicas = clusterSize; } // No calculation needed when there is only one rack or picking one node. - int numOfRacks = clusterMap.getNumOfRacks(); + int numOfRacks = clusterMap.getNumOfNonEmptyRacks(); // HDFS-14527 return default when numOfRacks = 0 to avoid // ArithmeticException when calc maxNodesPerRack at following logic. if (numOfRacks <= 1 || totalNumOfReplicas <= 1) { @@ -1173,7 +1173,7 @@ public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs, .map(dn -> dn.getNetworkLocation()).distinct().count(); return new BlockPlacementStatusDefault(Math.toIntExact(rackCount), - minRacks, clusterMap.getNumOfRacks()); + minRacks, clusterMap.getNumOfNonEmptyRacks()); } /** @@ -1370,4 +1370,3 @@ public boolean getExcludeSlowNodesEnabled() { return excludeSlowNodesEnabled; } } - diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java index dad877fdc7..a3b3f482e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java @@ -42,7 +42,7 @@ protected int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) { totalNumOfReplicas = clusterSize; } // No calculation needed when there is only one rack or picking one node. - int numOfRacks = clusterMap.getNumOfRacks(); + int numOfRacks = clusterMap.getNumOfNonEmptyRacks(); // HDFS-14527 return default when numOfRacks = 0 to avoid // ArithmeticException when calc maxNodesPerRack at following logic. if (numOfRacks <= 1 || totalNumOfReplicas <= 1) { @@ -90,38 +90,39 @@ protected Node chooseTargetInOrder(int numOfReplicas, EnumMap storageTypes) throws NotEnoughReplicasException { int totalReplicaExpected = results.size() + numOfReplicas; - int numOfRacks = clusterMap.getNumOfRacks(); - if (totalReplicaExpected < numOfRacks || - totalReplicaExpected % numOfRacks == 0) { - writer = chooseOnce(numOfReplicas, writer, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageTypes); - return writer; - } - - assert totalReplicaExpected > (maxNodesPerRack -1) * numOfRacks; - - // Calculate numOfReplicas for filling each rack exactly (maxNodesPerRack-1) - // replicas. - HashMap rackCounts = new HashMap<>(); - for (DatanodeStorageInfo dsInfo : results) { - String rack = dsInfo.getDatanodeDescriptor().getNetworkLocation(); - Integer count = rackCounts.get(rack); - if (count != null) { - rackCounts.put(rack, count + 1); - } else { - rackCounts.put(rack, 1); - } - } - int excess = 0; // Sum of the above (maxNodesPerRack-1) part of nodes in results - for (int count : rackCounts.values()) { - if (count > maxNodesPerRack -1) { - excess += count - (maxNodesPerRack -1); - } - } - numOfReplicas = Math.min(totalReplicaExpected - results.size(), - (maxNodesPerRack -1) * numOfRacks - (results.size() - excess)); + int numOfRacks = clusterMap.getNumOfNonEmptyRacks(); try { + if (totalReplicaExpected < numOfRacks || + totalReplicaExpected % numOfRacks == 0) { + writer = chooseOnce(numOfReplicas, writer, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes, storageTypes); + return writer; + } + + assert totalReplicaExpected > (maxNodesPerRack -1) * numOfRacks; + + // Calculate numOfReplicas for filling each rack exactly (maxNodesPerRack-1) + // replicas. + HashMap rackCounts = new HashMap<>(); + for (DatanodeStorageInfo dsInfo : results) { + String rack = dsInfo.getDatanodeDescriptor().getNetworkLocation(); + Integer count = rackCounts.get(rack); + if (count != null) { + rackCounts.put(rack, count + 1); + } else { + rackCounts.put(rack, 1); + } + } + int excess = 0; // Sum of the above (maxNodesPerRack-1) part of nodes in results + for (int count : rackCounts.values()) { + if (count > maxNodesPerRack -1) { + excess += count - (maxNodesPerRack -1); + } + } + numOfReplicas = Math.min(totalReplicaExpected - results.size(), + (maxNodesPerRack -1) * numOfRacks - (results.size() - excess)); + // Try to spread the replicas as evenly as possible across racks. // This is done by first placing with (maxNodesPerRack-1), then spreading // the remainder by calling again with maxNodesPerRack. @@ -243,7 +244,7 @@ public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs, racks.add(dn.getNetworkLocation()); } return new BlockPlacementStatusDefault(racks.size(), numberOfReplicas, - clusterMap.getNumOfRacks()); + clusterMap.getNumOfNonEmptyRacks()); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java index 42b6ddd8c7..1c95c26a19 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java @@ -177,6 +177,8 @@ public void startDecommission(DatanodeDescriptor node) { if (!node.isDecommissionInProgress() && !node.isDecommissioned()) { // Update DN stats maintained by HeartbeatManager hbManager.startDecommission(node); + // Update cluster's emptyRack + blockManager.getDatanodeManager().getNetworkTopology().decommissionNode(node); // hbManager.startDecommission will set dead node to decommissioned. if (node.isDecommissionInProgress()) { for (DatanodeStorageInfo storage : node.getStorageInfos()) { @@ -201,6 +203,8 @@ public void stopDecommission(DatanodeDescriptor node) { if (node.isDecommissionInProgress() || node.isDecommissioned()) { // Update DN stats maintained by HeartbeatManager hbManager.stopDecommission(node); + // Update cluster's emptyRack + blockManager.getDatanodeManager().getNetworkTopology().recommissionNode(node); // extra redundancy blocks will be detected and processed when // the dead node comes back and send in its full block report. if (node.isAlive()) { @@ -413,4 +417,4 @@ void runMonitorForTest() throws ExecutionException, InterruptedException { executor.submit(monitor).get(); } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index f080a101f6..0a00aa65c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -8405,7 +8405,7 @@ public ECTopologyVerifierResult getECTopologyResultForPolicies( getBlockManager().getDatanodeManager().getNumOfDataNodes(); int numOfRacks = getBlockManager().getDatanodeManager().getNetworkTopology() - .getNumOfRacks(); + .getNumOfNonEmptyRacks(); result = ECTopologyVerifier .getECTopologyVerifierResult(numOfRacks, numOfDataNodes, policies); } @@ -8950,7 +8950,7 @@ private ECTopologyVerifierResult getEcTopologyVerifierResultForEnabledPolicies() int numOfDataNodes = getBlockManager().getDatanodeManager().getNumOfDataNodes(); int numOfRacks = getBlockManager().getDatanodeManager().getNetworkTopology() - .getNumOfRacks(); + .getNumOfNonEmptyRacks(); ErasureCodingPolicy[] enabledEcPolicies = getErasureCodingPolicyManager().getCopyOfEnabledPolicies(); return ECTopologyVerifier @@ -9012,4 +9012,3 @@ public void checkErasureCodingSupported(String operationName) } } } - diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java index 88b7d2bf7f..3beea47800 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockPlacementPolicyRackFaultTolerant.java @@ -19,24 +19,35 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.net.DFSNetworkTopology; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.net.StaticMapping; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.*; +import java.util.function.Supplier; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -167,6 +178,108 @@ private void doTestChooseTargetSpecialCase() throws Exception { } } + /** + * Verify decommission a dn which is an only node in its rack. + */ + @Test + public void testPlacementWithOnlyOneNodeInRackDecommission() throws Exception { + Configuration conf = new HdfsConfiguration(); + final String[] racks = {"/RACK0", "/RACK0", "/RACK2", "/RACK3", "/RACK4", "/RACK5", "/RACK2"}; + final String[] hosts = {"/host0", "/host1", "/host2", "/host3", "/host4", "/host5", "/host6"}; + + // enables DFSNetworkTopology + conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + BlockPlacementPolicyRackFaultTolerant.class, + BlockPlacementPolicy.class); + conf.setBoolean(DFSConfigKeys.DFS_USE_DFS_NETWORK_TOPOLOGY_KEY, true); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, + DEFAULT_BLOCK_SIZE / 2); + + if (cluster != null) { + cluster.shutdown(); + } + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(7).racks(racks) + .hosts(hosts).build(); + cluster.waitActive(); + nameNodeRpc = cluster.getNameNodeRpc(); + namesystem = cluster.getNamesystem(); + DistributedFileSystem fs = cluster.getFileSystem(); + fs.enableErasureCodingPolicy("RS-3-2-1024k"); + fs.setErasureCodingPolicy(new Path("/"), "RS-3-2-1024k"); + + final BlockManager bm = cluster.getNamesystem().getBlockManager(); + final DatanodeManager dm = bm.getDatanodeManager(); + assertTrue(dm.getNetworkTopology() instanceof DFSNetworkTopology); + + String clientMachine = "/host4"; + String clientRack = "/RACK4"; + String src = "/test"; + + final DatanodeManager dnm = namesystem.getBlockManager().getDatanodeManager(); + DatanodeDescriptor dnd4 = dnm.getDatanode(cluster.getDataNodes().get(4).getDatanodeId()); + assertEquals(dnd4.getNetworkLocation(), clientRack); + dnm.getDatanodeAdminManager().startDecommission(dnd4); + short replication = 5; + short additionalReplication = 1; + + try { + // Create the file with client machine + HdfsFileStatus fileStatus = namesystem.startFile(src, perm, + clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, + replication, DEFAULT_BLOCK_SIZE * 1024 * 10, null, null, null, false); + + //test chooseTarget for new file + LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, + null, null, fileStatus.getFileId(), null, null); + HashMap racksCount = new HashMap(); + doTestLocatedBlockRacks(racksCount, replication, 4, locatedBlock); + + //test chooseTarget for existing file. + LocatedBlock additionalLocatedBlock = + nameNodeRpc.getAdditionalDatanode(src, fileStatus.getFileId(), + locatedBlock.getBlock(), locatedBlock.getLocations(), + locatedBlock.getStorageIDs(), DatanodeInfo.EMPTY_ARRAY, + additionalReplication, clientMachine); + + racksCount.clear(); + doTestLocatedBlockRacks(racksCount, additionalReplication + replication, + 4, additionalLocatedBlock); + assertEquals(racksCount.get("/RACK0"), (Integer)2); + assertEquals(racksCount.get("/RACK2"), (Integer)2); + } finally { + dnm.getDatanodeAdminManager().stopDecommission(dnd4); + } + + //test if decommission succeeded + DatanodeDescriptor dnd3 = dnm.getDatanode(cluster.getDataNodes().get(3).getDatanodeId()); + cluster.getNamesystem().writeLock(); + try { + dm.getDatanodeAdminManager().startDecommission(dnd3); + } finally { + cluster.getNamesystem().writeUnlock(); + } + + // make sure the decommission finishes and the block in on 4 racks + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return dnd3.isDecommissioned(); + } + }, 1000, 10 * 1000); + + LocatedBlocks locatedBlocks = + cluster.getFileSystem().getClient().getLocatedBlocks( + src, 0, DEFAULT_BLOCK_SIZE); + assertEquals(4, bm.getDatanodeManager(). + getNetworkTopology().getNumOfNonEmptyRacks()); + for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) { + BlockPlacementStatus status = bm.getStriptedBlockPlacementPolicy() + .verifyBlockPlacement(block.getLocations(), 5); + Assert.assertTrue(status.isPlacementPolicySatisfied()); + } + } + private void shuffle(DatanodeInfo[] locs, String[] storageIDs) { int length = locs.length; Object[][] pairs = new Object[length][]; @@ -198,6 +311,17 @@ private void doTestLocatedBlock(int replication, LocatedBlock locatedBlock) { assertTrue(maxCount - minCount <= 1); } + private void doTestLocatedBlockRacks(HashMap racksCount, int replication, + int validracknum, LocatedBlock locatedBlock) { + assertEquals(replication, locatedBlock.getLocations().length); + + for (DatanodeInfo node : + locatedBlock.getLocations()) { + addToRacksCount(node.getNetworkLocation(), racksCount); + } + assertEquals(validracknum, racksCount.size()); + } + private void addToRacksCount(String rack, HashMap racksCount) { Integer count = racksCount.get(rack); if (count == null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java index 5758fe7986..531f1eb0df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java @@ -633,4 +633,20 @@ public void testCountNumOfAvailableNodes() { numNodes = cluster.countNumOfAvailableNodes(NodeBase.ROOT, excludedNodes); assertEquals(12, numNodes); } + + @Test + public void testAddAndRemoveNodeWithEmptyRack() { + DatanodeDescriptor n1 = DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d2/r3"); + DatanodeDescriptor n2 = DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r3"); + DatanodeDescriptor n3 = DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/d2/r3"); + + cluster.decommissionNode(n1); + assertEquals(6, cluster.getNumOfNonEmptyRacks()); + cluster.decommissionNode(n2); + cluster.decommissionNode(n3); + assertEquals(5, cluster.getNumOfNonEmptyRacks()); + + cluster.recommissionNode(n1); + assertEquals(6, cluster.getNumOfNonEmptyRacks()); + } }