From 7060725662cb3317ff2f0fcc38f965fd23e8e6aa Mon Sep 17 00:00:00 2001 From: Inigo Goiri Date: Tue, 6 Mar 2018 09:09:32 -0800 Subject: [PATCH] HDFS-13188. Disk Balancer: Support multiple block pools during block move. Contributed by Bharat Viswanadham. --- .../hdfs/server/datanode/DiskBalancer.java | 4 +- .../diskbalancer/DiskBalancerTestUtil.java | 21 ++++-- .../server/diskbalancer/TestDiskBalancer.java | 72 ++++++++++++++++--- .../diskbalancer/TestDiskBalancerRPC.java | 4 +- 4 files changed, 85 insertions(+), 16 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java index e90a47ef85..91c3624024 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java @@ -958,8 +958,8 @@ ExtendedBlock getNextBlock(List poolIters, ExtendedBlock block = null; while (block == null && currentCount < poolIters.size()) { currentCount++; - poolIndex = poolIndex++ % poolIters.size(); - FsVolumeSpi.BlockIterator currentPoolIter = poolIters.get(poolIndex); + int index = poolIndex++ % poolIters.size(); + FsVolumeSpi.BlockIterator currentPoolIter = poolIters.get(index); block = getBlockToCopy(currentPoolIter, item); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java index bc4181bc68..bd8dbce85b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java @@ -40,6 +40,10 @@ import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet; import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -51,6 +55,7 @@ * Helper class to create various cluster configurations at run time. */ public class DiskBalancerTestUtil { + static final Logger LOG = LoggerFactory.getLogger(TestDiskBalancer.class); public static final long MB = 1024 * 1024L; public static final long GB = MB * 1024L; public static final long TB = GB * 1024L; @@ -241,17 +246,25 @@ public DiskBalancerCluster createRandCluster(int dataNodeCount, * @return Number of Blocks. * @throws IOException */ - public static int getBlockCount(FsVolumeSpi source) throws IOException { + public static int getBlockCount(FsVolumeSpi source, + boolean checkblockPoolCount) + throws IOException { int count = 0; for (String blockPoolID : source.getBlockPoolList()) { FsVolumeSpi.BlockIterator sourceIter = source.newBlockIterator(blockPoolID, "TestDiskBalancerSource"); + int blockCount = 0; while (!sourceIter.atEnd()) { ExtendedBlock block = sourceIter.nextBlock(); if (block != null) { - count++; + blockCount++; } } + if (checkblockPoolCount) { + LOG.info("Block Pool Id: {}, blockCount: {}", blockPoolID, blockCount); + assertTrue(blockCount > 0); + } + count += blockCount; } return count; } @@ -320,10 +333,10 @@ public static MiniDFSCluster newImbalancedCluster( dnNode.getFSDataset().getFsVolumeReferences()) { source = (FsVolumeImpl) refs.get(0); dest = (FsVolumeImpl) refs.get(1); - assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0); + assertTrue(DiskBalancerTestUtil.getBlockCount(source, true) > 0); DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(), source, dest); - assertTrue(DiskBalancerTestUtil.getBlockCount(source) == 0); + assertEquals(0, DiskBalancerTestUtil.getBlockCount(source, false)); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java index 556803228d..deae6eab7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.balancer.TestBalancer; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -160,6 +161,62 @@ public void testDiskBalancerEndToEnd() throws Exception { } } + @Test + public void testDiskBalancerWithFederatedCluster() throws Exception { + + Configuration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); + final int blockCount = 100; + final int blockSize = 1024; + final int diskCount = 2; + final int dataNodeCount = 1; + final int dataNodeIndex = 0; + final int sourceDiskIndex = 0; + final long cap = blockSize * 3L * blockCount; + + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2)) + .numDataNodes(dataNodeCount) + .storagesPerDatanode(diskCount) + .storageCapacities(new long[] {cap, cap}) + .build(); + cluster.waitActive(); + + DFSTestUtil.setFederatedConfiguration(cluster, conf); + + final String fileName = "/tmp.txt"; + final Path filePath = new Path(fileName); + long fileLen = blockCount * blockSize; + + + FileSystem fs = cluster.getFileSystem(0); + TestBalancer.createFile(cluster, filePath, fileLen, (short) 1, + 0); + DFSTestUtil.waitReplication(fs, filePath, (short) 1); + + fs = cluster.getFileSystem(1); + TestBalancer.createFile(cluster, filePath, fileLen, (short) 1, + 1); + DFSTestUtil.waitReplication(fs, filePath, (short) 1); + + try { + DataMover dataMover = new DataMover(cluster, dataNodeIndex, + sourceDiskIndex, conf, blockSize, blockCount); + dataMover.moveDataToSourceDisk(); + NodePlan plan = dataMover.generatePlan(); + dataMover.executePlan(plan); + dataMover.verifyPlanExectionDone(); + dataMover.verifyAllVolumesHaveData(); + dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10); + } finally { + cluster.shutdown(); + } + + } + @Test public void testBalanceDataBetweenMultiplePairsOfVolumes() throws Exception { @@ -599,9 +656,9 @@ public void verifyAllVolumesHaveData() throws IOException { try (FsDatasetSpi.FsVolumeReferences refs = node.getFSDataset().getFsVolumeReferences()) { for (FsVolumeSpi volume : refs) { - assertTrue(DiskBalancerTestUtil.getBlockCount(volume) > 0); - LOG.info(refs.toString() + " : Block Count : {}", - DiskBalancerTestUtil.getBlockCount(volume)); + assertTrue(DiskBalancerTestUtil.getBlockCount(volume, true) > 0); + LOG.info("{} : Block Count : {}", refs, DiskBalancerTestUtil + .getBlockCount(volume, true)); } } } @@ -618,12 +675,11 @@ public void verifyTolerance(NodePlan plan, int planIndex, int try (FsDatasetSpi.FsVolumeReferences refs = node.getFSDataset().getFsVolumeReferences()) { volume = (FsVolumeImpl) refs.get(sourceDiskIndex); - assertTrue(DiskBalancerTestUtil.getBlockCount(volume) > 0); + assertTrue(DiskBalancerTestUtil.getBlockCount(volume, true) > 0); - assertTrue( - (DiskBalancerTestUtil.getBlockCount(volume) * - (blockSize + delta)) >= - plan.getVolumeSetPlans().get(0).getBytesToMove()); + assertTrue((DiskBalancerTestUtil.getBlockCount(volume, true) * + (blockSize + delta)) >= plan.getVolumeSetPlans().get(0) + .getBytesToMove()); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java index 368fd89eec..9c68f8d926 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java @@ -51,7 +51,7 @@ import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN; import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_DONE; import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_UNDER_PROGRESS; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; /** * Test DiskBalancer RPC. @@ -265,7 +265,7 @@ public void testMoveBlockAcrossVolume() throws Exception { dest = (FsVolumeImpl) refs.get(1); DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(), source, dest); - assertTrue(DiskBalancerTestUtil.getBlockCount(source) == 0); + assertEquals(0, DiskBalancerTestUtil.getBlockCount(source, false)); } finally { refs.close(); }