diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java index e9e2e5b499..d853ae945e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java @@ -501,14 +501,11 @@ public class DiskBalancer { public void run() { Thread.currentThread().setName("DiskBalancerThread"); LOG.info("Executing Disk balancer plan. Plan File: {}, Plan ID: {}", - planFile, planID); - try { - for (Map.Entry entry : - workMap.entrySet()) { - blockMover.copyBlocks(entry.getKey(), entry.getValue()); - } - } finally { - blockMover.setExitFlag(); + planFile, planID); + for (Map.Entry entry : + workMap.entrySet()) { + blockMover.setRunnable(); + blockMover.copyBlocks(entry.getKey(), entry.getValue()); } } }); @@ -857,8 +854,8 @@ public class DiskBalancer { if (item.getErrorCount() >= getMaxError(item)) { item.setErrMsg("Error count exceeded."); - LOG.info("Maximum error count exceeded. Error count: {} Max error:{} " - , item.getErrorCount(), item.getMaxDiskErrors()); + LOG.info("Maximum error count exceeded. Error count: {} Max error:{} ", + item.getErrorCount(), item.getMaxDiskErrors()); } return null; @@ -962,7 +959,8 @@ public class DiskBalancer { LOG.error("Exceeded the max error count. source {}, dest: {} " + "error count: {}", source.getBasePath(), dest.getBasePath(), item.getErrorCount()); - break; + this.setExitFlag(); + continue; } // Check for the block tolerance constraint. @@ -971,7 +969,8 @@ public class DiskBalancer { "blocks.", source.getBasePath(), dest.getBasePath(), item.getBytesCopied(), item.getBlocksCopied()); - break; + this.setExitFlag(); + continue; } ExtendedBlock block = getNextBlock(poolIters, item); @@ -979,7 +978,8 @@ public class DiskBalancer { if (block == null) { LOG.error("No source blocks, exiting the copy. Source: {}, " + "dest:{}", source.getBasePath(), dest.getBasePath()); - break; + this.setExitFlag(); + continue; } // check if someone told us exit, treat this as an interruption @@ -987,7 +987,7 @@ public class DiskBalancer { // for the thread, since both getNextBlock and moveBlocAcrossVolume // can take some time. if (!shouldRun()) { - break; + continue; } long timeUsed; @@ -1006,7 +1006,8 @@ public class DiskBalancer { LOG.error("Destination volume: {} does not have enough space to" + " accommodate a block. Block Size: {} Exiting from" + " copyBlocks.", dest.getBasePath(), block.getNumBytes()); - break; + this.setExitFlag(); + continue; } LOG.debug("Moved block with size {} from {} to {}", diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java index dc177fddd1..eb15bdc656 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java @@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.balancer.TestBalancer; @@ -37,19 +36,18 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector; import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster; -import org.apache.hadoop.hdfs.server.diskbalancer.datamodel - .DiskBalancerDataNode; +import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume; import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; -import java.net.URISyntaxException; import java.util.LinkedList; import java.util.List; -import java.util.Random; import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; @@ -62,6 +60,7 @@ import static org.junit.Assert.assertTrue; public class TestDiskBalancer { private static final String PLAN_FILE = "/system/current.plan.json"; + static final Logger LOG = LoggerFactory.getLogger(TestDiskBalancer.class); @Test public void testDiskBalancerNameNodeConnectivity() throws Exception { @@ -110,227 +109,77 @@ public class TestDiskBalancer { */ @Test public void testDiskBalancerEndToEnd() throws Exception { + Configuration conf = new HdfsConfiguration(); - final int defaultBlockSize = 100; conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, defaultBlockSize); - conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, defaultBlockSize); - conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); - final int numDatanodes = 1; - final String fileName = "/tmp.txt"; - final Path filePath = new Path(fileName); - final int blocks = 100; - final int blocksSize = 1024; - final int fileLen = blocks * blocksSize; + final int blockCount = 100; + final int blockSize = 1024; + final int diskCount = 2; + final int dataNodeCount = 1; + final int dataNodeIndex = 0; + final int sourceDiskIndex = 0; - - // Write a file and restart the cluster - long[] capacities = new long[]{defaultBlockSize * 2 * fileLen, - defaultBlockSize * 2 * fileLen}; - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(numDatanodes) - .storageCapacities(capacities) - .storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK}) - .storagesPerDatanode(2) + MiniDFSCluster cluster = new ClusterBuilder() + .setBlockCount(blockCount) + .setBlockSize(blockSize) + .setDiskCount(diskCount) + .setNumDatanodes(dataNodeCount) + .setConf(conf) .build(); - FsVolumeImpl source = null; - FsVolumeImpl dest = null; try { - cluster.waitActive(); - Random r = new Random(); - FileSystem fs = cluster.getFileSystem(0); - TestBalancer.createFile(cluster, filePath, fileLen, (short) 1, - numDatanodes - 1); - - DFSTestUtil.waitReplication(fs, filePath, (short) 1); - cluster.restartDataNodes(); - cluster.waitActive(); - - // Get the data node and move all data to one disk. - DataNode dnNode = cluster.getDataNodes().get(numDatanodes - 1); - try (FsDatasetSpi.FsVolumeReferences refs = - dnNode.getFSDataset().getFsVolumeReferences()) { - source = (FsVolumeImpl) refs.get(0); - dest = (FsVolumeImpl) refs.get(1); - assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0); - DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(), - source, dest); - assertTrue(DiskBalancerTestUtil.getBlockCount(source) == 0); - } - - cluster.restartDataNodes(); - cluster.waitActive(); - - // Start up a disk balancer and read the cluster info. - final DataNode newDN = cluster.getDataNodes().get(numDatanodes - 1); - ClusterConnector nameNodeConnector = - ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf); - - DiskBalancerCluster diskBalancerCluster = - new DiskBalancerCluster(nameNodeConnector); - diskBalancerCluster.readClusterInfo(); - List nodesToProcess = new LinkedList<>(); - - // Rewrite the capacity in the model to show that disks need - // re-balancing. - setVolumeCapacity(diskBalancerCluster, defaultBlockSize * 2 * fileLen, - "DISK"); - // Pick a node to process. - nodesToProcess.add(diskBalancerCluster.getNodeByUUID(dnNode - .getDatanodeUuid())); - diskBalancerCluster.setNodesToProcess(nodesToProcess); - - // Compute a plan. - List clusterplan = diskBalancerCluster.computePlan(0.0f); - - // Now we must have a plan,since the node is imbalanced and we - // asked the disk balancer to create a plan. - assertTrue(clusterplan.size() == 1); - - NodePlan plan = clusterplan.get(0); - plan.setNodeUUID(dnNode.getDatanodeUuid()); - plan.setTimeStamp(Time.now()); - String planJson = plan.toJson(); - String planID = DigestUtils.shaHex(planJson); - assertNotNull(plan.getVolumeSetPlans()); - assertTrue(plan.getVolumeSetPlans().size() > 0); - plan.getVolumeSetPlans().get(0).setTolerancePercent(10); - - // Submit the plan and wait till the execution is done. - newDN.submitDiskBalancerPlan(planID, 1, PLAN_FILE, planJson, false); - String jmxString = newDN.getDiskBalancerStatus(); - assertNotNull(jmxString); - DiskBalancerWorkStatus status = - DiskBalancerWorkStatus.parseJson(jmxString); - DiskBalancerWorkStatus realStatus = newDN.queryDiskBalancerPlan(); - assertEquals(realStatus.getPlanID(), status.getPlanID()); - - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - try { - return newDN.queryDiskBalancerPlan().getResult() == - DiskBalancerWorkStatus.Result.PLAN_DONE; - } catch (IOException ex) { - return false; - } - } - }, 1000, 100000); - - - //verify that it worked. - dnNode = cluster.getDataNodes().get(numDatanodes - 1); - assertEquals(dnNode.queryDiskBalancerPlan().getResult(), - DiskBalancerWorkStatus.Result.PLAN_DONE); - try (FsDatasetSpi.FsVolumeReferences refs = - dnNode.getFSDataset().getFsVolumeReferences()) { - source = (FsVolumeImpl) refs.get(0); - assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0); - } - - - // Tolerance - long delta = (plan.getVolumeSetPlans().get(0).getBytesToMove() - * 10) / 100; - assertTrue( - (DiskBalancerTestUtil.getBlockCount(source) * - defaultBlockSize + delta) >= - plan.getVolumeSetPlans().get(0).getBytesToMove()); - + DataMover dataMover = new DataMover(cluster, dataNodeIndex, + sourceDiskIndex, conf, blockSize, blockCount); + dataMover.moveDataToSourceDisk(); + NodePlan plan = dataMover.generatePlan(); + dataMover.executePlan(plan); + dataMover.verifyPlanExectionDone(); + dataMover.verifyAllVolumesHaveData(); + dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10); } finally { cluster.shutdown(); } } - @Test(timeout=60000) + @Test public void testBalanceDataBetweenMultiplePairsOfVolumes() throws Exception { + Configuration conf = new HdfsConfiguration(); - final int DEFAULT_BLOCK_SIZE = 2048; conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); - conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE); - conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); - final int NUM_DATANODES = 1; - final long CAP = 512 * 1024; - final Path testFile = new Path("/testfile"); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(NUM_DATANODES) - .storageCapacities(new long[]{CAP, CAP, CAP, CAP}) - .storagesPerDatanode(4) + final int blockCount = 1000; + final int blockSize = 1024; + + // create 3 disks, that means we will have 2 plans + // Move Data from disk0->disk1 and disk0->disk2. + final int diskCount = 3; + final int dataNodeCount = 1; + final int dataNodeIndex = 0; + final int sourceDiskIndex = 0; + + + MiniDFSCluster cluster = new ClusterBuilder() + .setBlockCount(blockCount) + .setBlockSize(blockSize) + .setDiskCount(diskCount) + .setNumDatanodes(dataNodeCount) + .setConf(conf) .build(); + + try { - cluster.waitActive(); - DistributedFileSystem fs = cluster.getFileSystem(); - TestBalancer.createFile(cluster, testFile, CAP, (short) 1, 0); + DataMover dataMover = new DataMover(cluster, dataNodeIndex, + sourceDiskIndex, conf, blockSize, blockCount); + dataMover.moveDataToSourceDisk(); + NodePlan plan = dataMover.generatePlan(); - DFSTestUtil.waitReplication(fs, testFile, (short) 1); - DataNode dnNode = cluster.getDataNodes().get(0); - // Move data out of two volumes to make them empty. - try (FsDatasetSpi.FsVolumeReferences refs = - dnNode.getFSDataset().getFsVolumeReferences()) { - assertEquals(4, refs.size()); - for (int i = 0; i < refs.size(); i += 2) { - FsVolumeImpl source = (FsVolumeImpl) refs.get(i); - FsVolumeImpl dest = (FsVolumeImpl) refs.get(i + 1); - assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0); - DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(), - source, dest); - assertTrue(DiskBalancerTestUtil.getBlockCount(source) == 0); - } - } + // 3 disks , The plan should move data both disks, + // so we must have 2 plan steps. + assertEquals(plan.getVolumeSetPlans().size(), 2); - cluster.restartDataNodes(); - cluster.waitActive(); - - // Start up a disk balancer and read the cluster info. - final DataNode dataNode = cluster.getDataNodes().get(0); - ClusterConnector nameNodeConnector = - ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf); - - DiskBalancerCluster diskBalancerCluster = - new DiskBalancerCluster(nameNodeConnector); - diskBalancerCluster.readClusterInfo(); - List nodesToProcess = new LinkedList<>(); - // Rewrite the capacity in the model to show that disks need - // re-balancing. - setVolumeCapacity(diskBalancerCluster, CAP, "DISK"); - nodesToProcess.add(diskBalancerCluster.getNodeByUUID( - dataNode.getDatanodeUuid())); - diskBalancerCluster.setNodesToProcess(nodesToProcess); - - // Compute a plan. - List clusterPlan = diskBalancerCluster.computePlan(10.0f); - - NodePlan plan = clusterPlan.get(0); - assertEquals(2, plan.getVolumeSetPlans().size()); - plan.setNodeUUID(dnNode.getDatanodeUuid()); - plan.setTimeStamp(Time.now()); - String planJson = plan.toJson(); - String planID = DigestUtils.shaHex(planJson); - - dataNode.submitDiskBalancerPlan(planID, 1, PLAN_FILE, planJson, false); - - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - try { - return dataNode.queryDiskBalancerPlan().getResult() == - DiskBalancerWorkStatus.Result.PLAN_DONE; - } catch (IOException ex) { - return false; - } - } - }, 1000, 100000); - assertEquals(dataNode.queryDiskBalancerPlan().getResult(), - DiskBalancerWorkStatus.Result.PLAN_DONE); - - try (FsDatasetSpi.FsVolumeReferences refs = - dataNode.getFSDataset().getFsVolumeReferences()) { - for (FsVolumeSpi vol : refs) { - assertTrue(DiskBalancerTestUtil.getBlockCount(vol) > 0); - } - } + dataMover.executePlan(plan); + dataMover.verifyPlanExectionDone(); + dataMover.verifyAllVolumesHaveData(); + dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10); } finally { cluster.shutdown(); } @@ -353,4 +202,293 @@ public class TestDiskBalancer { node.getVolumeSets().get(diskType).computeVolumeDataDensity(); } } + + /** + * Helper class that allows us to create different kinds of MiniDFSClusters + * and populate data. + */ + static class ClusterBuilder { + private Configuration conf; + private int blockSize; + private int numDatanodes; + private int fileLen; + private int blockCount; + private int diskCount; + + public ClusterBuilder setConf(Configuration conf) { + this.conf = conf; + return this; + } + + public ClusterBuilder setBlockSize(int blockSize) { + this.blockSize = blockSize; + return this; + } + + public ClusterBuilder setNumDatanodes(int datanodeCount) { + this.numDatanodes = datanodeCount; + return this; + } + + public ClusterBuilder setBlockCount(int blockCount) { + this.blockCount = blockCount; + return this; + } + + public ClusterBuilder setDiskCount(int diskCount) { + this.diskCount = diskCount; + return this; + } + + private long[] getCapacities(int diskCount, int bSize, int fSize) { + Preconditions.checkState(diskCount > 0); + long[] capacities = new long[diskCount]; + for (int x = 0; x < diskCount; x++) { + capacities[x] = diskCount * bSize * fSize * 2L; + } + return capacities; + } + + private StorageType[] getStorageTypes(int diskCount) { + Preconditions.checkState(diskCount > 0); + StorageType[] array = new StorageType[diskCount]; + for (int x = 0; x < diskCount; x++) { + array[x] = StorageType.DISK; + } + return array; + } + + public MiniDFSCluster build() throws IOException, TimeoutException, + InterruptedException { + Preconditions.checkNotNull(this.conf); + Preconditions.checkState(blockSize > 0); + Preconditions.checkState(numDatanodes > 0); + fileLen = blockCount * blockSize; + Preconditions.checkState(fileLen > 0); + conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); + + final String fileName = "/tmp.txt"; + Path filePath = new Path(fileName); + fileLen = blockCount * blockSize; + + + // Write a file and restart the cluster + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numDatanodes) + .storageCapacities(getCapacities(diskCount, blockSize, fileLen)) + .storageTypes(getStorageTypes(diskCount)) + .storagesPerDatanode(diskCount) + .build(); + generateData(filePath, cluster); + cluster.restartDataNodes(); + cluster.waitActive(); + return cluster; + } + + private void generateData(Path filePath, MiniDFSCluster cluster) + throws IOException, InterruptedException, TimeoutException { + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(0); + TestBalancer.createFile(cluster, filePath, fileLen, (short) 1, + numDatanodes - 1); + DFSTestUtil.waitReplication(fs, filePath, (short) 1); + cluster.restartDataNodes(); + cluster.waitActive(); + } + } + + class DataMover { + private final MiniDFSCluster cluster; + private final int sourceDiskIndex; + private final int dataNodeIndex; + private final Configuration conf; + private final int blockCount; + private final int blockSize; + private DataNode node; + + /** + * Constructs a DataMover class. + * + * @param cluster - MiniDFSCluster. + * @param dataNodeIndex - Datanode to operate against. + * @param sourceDiskIndex - source Disk Index. + */ + public DataMover(MiniDFSCluster cluster, int dataNodeIndex, int + sourceDiskIndex, Configuration conf, int blockSize, int + blockCount) { + this.cluster = cluster; + this.dataNodeIndex = dataNodeIndex; + this.node = cluster.getDataNodes().get(dataNodeIndex); + this.sourceDiskIndex = sourceDiskIndex; + this.conf = conf; + this.blockCount = blockCount; + this.blockSize = blockSize; + } + + /** + * Moves all data to a source disk to create disk imbalance so we can run a + * planner. + * + * @throws IOException + */ + public void moveDataToSourceDisk() throws IOException { + moveAllDataToDestDisk(this.node, sourceDiskIndex); + cluster.restartDataNodes(); + cluster.waitActive(); + + } + + /** + * Moves all data in the data node to one disk. + * + * @param dataNode - Datanode + * @param destDiskindex - Index of the destination disk. + */ + private void moveAllDataToDestDisk(DataNode dataNode, int destDiskindex) + throws IOException { + Preconditions.checkNotNull(dataNode); + Preconditions.checkState(destDiskindex >= 0); + try (FsDatasetSpi.FsVolumeReferences refs = + dataNode.getFSDataset().getFsVolumeReferences()) { + if (refs.size() <= destDiskindex) { + throw new IllegalArgumentException("Invalid Disk index."); + } + FsVolumeImpl dest = (FsVolumeImpl) refs.get(destDiskindex); + for (int x = 0; x < refs.size(); x++) { + if (x == destDiskindex) { + continue; + } + FsVolumeImpl source = (FsVolumeImpl) refs.get(x); + DiskBalancerTestUtil.moveAllDataToDestVolume(dataNode.getFSDataset(), + source, dest); + + } + } + } + + /** + * Generates a NodePlan for the datanode specified. + * + * @return NodePlan. + */ + public NodePlan generatePlan() throws Exception { + + // Start up a disk balancer and read the cluster info. + node = cluster.getDataNodes().get(dataNodeIndex); + ClusterConnector nameNodeConnector = + ConnectorFactory.getCluster(cluster.getFileSystem(dataNodeIndex) + .getUri(), conf); + + DiskBalancerCluster diskBalancerCluster = + new DiskBalancerCluster(nameNodeConnector); + diskBalancerCluster.readClusterInfo(); + List nodesToProcess = new LinkedList<>(); + + // Rewrite the capacity in the model to show that disks need + // re-balancing. + setVolumeCapacity(diskBalancerCluster, blockSize * 2L * blockCount, + "DISK"); + // Pick a node to process. + nodesToProcess.add(diskBalancerCluster.getNodeByUUID( + node.getDatanodeUuid())); + diskBalancerCluster.setNodesToProcess(nodesToProcess); + + // Compute a plan. + List clusterplan = diskBalancerCluster.computePlan(0.0f); + + // Now we must have a plan,since the node is imbalanced and we + // asked the disk balancer to create a plan. + assertTrue(clusterplan.size() == 1); + + NodePlan plan = clusterplan.get(0); + plan.setNodeUUID(node.getDatanodeUuid()); + plan.setTimeStamp(Time.now()); + + assertNotNull(plan.getVolumeSetPlans()); + assertTrue(plan.getVolumeSetPlans().size() > 0); + plan.getVolumeSetPlans().get(0).setTolerancePercent(10); + return plan; + } + + /** + * Waits for a plan executing to finish. + */ + public void executePlan(NodePlan plan) throws + IOException, TimeoutException, InterruptedException { + + node = cluster.getDataNodes().get(dataNodeIndex); + String planJson = plan.toJson(); + String planID = DigestUtils.shaHex(planJson); + + // Submit the plan and wait till the execution is done. + node.submitDiskBalancerPlan(planID, 1, PLAN_FILE, planJson, + false); + String jmxString = node.getDiskBalancerStatus(); + assertNotNull(jmxString); + DiskBalancerWorkStatus status = + DiskBalancerWorkStatus.parseJson(jmxString); + DiskBalancerWorkStatus realStatus = node.queryDiskBalancerPlan(); + assertEquals(realStatus.getPlanID(), status.getPlanID()); + + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + return node.queryDiskBalancerPlan().getResult() == + DiskBalancerWorkStatus.Result.PLAN_DONE; + } catch (IOException ex) { + return false; + } + } + }, 1000, 100000); + } + + /** + * Verifies the Plan Execution has been done. + */ + public void verifyPlanExectionDone() throws IOException { + node = cluster.getDataNodes().get(dataNodeIndex); + assertEquals(node.queryDiskBalancerPlan().getResult(), + DiskBalancerWorkStatus.Result.PLAN_DONE); + } + + /** + * Once diskBalancer is run, all volumes mush has some data. + */ + public void verifyAllVolumesHaveData() throws IOException { + node = cluster.getDataNodes().get(dataNodeIndex); + try (FsDatasetSpi.FsVolumeReferences refs = + node.getFSDataset().getFsVolumeReferences()) { + for (FsVolumeSpi volume : refs) { + assertTrue(DiskBalancerTestUtil.getBlockCount(volume) > 0); + LOG.info(refs.toString() + " : Block Count : {}", + DiskBalancerTestUtil.getBlockCount(volume)); + } + } + } + + /** + * Verifies that tolerance values are honored correctly. + */ + public void verifyTolerance(NodePlan plan, int planIndex, int + sourceDiskIndex, int tolerance) throws IOException { + // Tolerance + long delta = (plan.getVolumeSetPlans().get(planIndex).getBytesToMove() + * tolerance) / 100; + FsVolumeImpl volume = null; + try (FsDatasetSpi.FsVolumeReferences refs = + node.getFSDataset().getFsVolumeReferences()) { + volume = (FsVolumeImpl) refs.get(sourceDiskIndex); + assertTrue(DiskBalancerTestUtil.getBlockCount(volume) > 0); + + assertTrue( + (DiskBalancerTestUtil.getBlockCount(volume) * + (blockSize + delta)) >= + plan.getVolumeSetPlans().get(0).getBytesToMove()); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java index c362f49706..794a887aa6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java @@ -358,14 +358,13 @@ public class TestDiskBalancerWithMockMover { private AtomicBoolean shouldRun; private FsDatasetSpi dataset; - private Integer runCount; + private int runCount; private volatile boolean sleepInCopyBlocks; private long delay; public TestMover(FsDatasetSpi dataset) { this.dataset = dataset; this.shouldRun = new AtomicBoolean(false); - this.runCount = new Integer(0); } public void setSleep() { @@ -401,7 +400,7 @@ public class TestDiskBalancerWithMockMover { if (delay > 0) { Thread.sleep(delay); } - synchronized (runCount) { + synchronized (this) { if (shouldRun()) { runCount++; } @@ -461,9 +460,9 @@ public class TestDiskBalancerWithMockMover { } public int getRunCount() { - synchronized (runCount) { - LOG.info("Run count : " + runCount.intValue()); - return runCount.intValue(); + synchronized (this) { + LOG.info("Run count : " + runCount); + return runCount; } } } @@ -510,7 +509,7 @@ public class TestDiskBalancerWithMockMover { } } - private class DiskBalancerBuilder { + private static class DiskBalancerBuilder { private TestMover blockMover; private Configuration conf; private String nodeID; @@ -546,7 +545,7 @@ public class TestDiskBalancerWithMockMover { } } - private class DiskBalancerClusterBuilder { + private static class DiskBalancerClusterBuilder { private String jsonFilePath; private Configuration conf; @@ -573,7 +572,7 @@ public class TestDiskBalancerWithMockMover { } } - private class PlanBuilder { + private static class PlanBuilder { private String sourcePath; private String destPath; private String sourceUUID;