diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java index c6948f880e..cb2c913666 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java @@ -486,10 +486,13 @@ private void executePlan() { public void run() { Thread.currentThread().setName("DiskBalancerThread"); LOG.info("Executing Disk balancer plan. Plan ID - " + planID); - - for (Map.Entry entry : - workMap.entrySet()) { - blockMover.copyBlocks(entry.getKey(), entry.getValue()); + try { + for (Map.Entry entry : + workMap.entrySet()) { + blockMover.copyBlocks(entry.getKey(), entry.getValue()); + } + } finally { + blockMover.setExitFlag(); } } }); @@ -943,8 +946,7 @@ public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) { LOG.error("Exceeded the max error count. source {}, dest: {} " + "error count: {}", source.getBasePath(), dest.getBasePath(), item.getErrorCount()); - this.setExitFlag(); - continue; + break; } // Check for the block tolerance constraint. @@ -953,17 +955,15 @@ public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) { "blocks.", source.getBasePath(), dest.getBasePath(), item.getBytesCopied(), item.getBlocksCopied()); - this.setExitFlag(); - continue; + break; } ExtendedBlock block = getNextBlock(poolIters, item); // we are not able to find any blocks to copy. if (block == null) { - this.setExitFlag(); LOG.error("No source blocks, exiting the copy. Source: {}, " + "dest:{}", source.getBasePath(), dest.getBasePath()); - continue; + break; } // check if someone told us exit, treat this as an interruption @@ -971,7 +971,7 @@ public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) { // for the thread, since both getNextBlock and moveBlocAcrossVolume // can take some time. if (!shouldRun()) { - continue; + break; } long timeUsed; @@ -990,8 +990,7 @@ public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) { LOG.error("Destination volume: {} does not have enough space to" + " accommodate a block. Block Size: {} Exiting from" + " copyBlocks.", dest.getBasePath(), block.getNumBytes()); - this.setExitFlag(); - continue; + break; } LOG.debug("Moved block with size {} from {} to {}", diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java index 1e105393e1..f27b9314c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java @@ -25,12 +25,14 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.balancer.TestBalancer; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector; import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory; @@ -44,9 +46,11 @@ import org.junit.Test; import java.io.IOException; +import java.net.URISyntaxException; import java.util.LinkedList; import java.util.List; import java.util.Random; +import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -190,7 +194,6 @@ public void testDiskBalancerEndToEnd() throws Exception { assertTrue(plan.getVolumeSetPlans().size() > 0); plan.getVolumeSetPlans().get(0).setTolerancePercent(10); - // Submit the plan and wait till the execution is done. newDN.submitDiskBalancerPlan(planID, 1, planJson, false); String jmxString = newDN.getDiskBalancerStatus(); @@ -237,6 +240,100 @@ public Boolean get() { } } + @Test(timeout=60000) + public void testBalanceDataBetweenMultiplePairsOfVolumes() + throws Exception { + Configuration conf = new HdfsConfiguration(); + final int DEFAULT_BLOCK_SIZE = 2048; + conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); + final int NUM_DATANODES = 1; + final long CAP = 512 * 1024; + final Path testFile = new Path("/testfile"); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(NUM_DATANODES) + .storageCapacities(new long[]{CAP, CAP, CAP, CAP}) + .storagesPerDatanode(4) + .build(); + try { + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + TestBalancer.createFile(cluster, testFile, CAP, (short) 1, 0); + + DFSTestUtil.waitReplication(fs, testFile, (short) 1); + DataNode dnNode = cluster.getDataNodes().get(0); + // Move data out of two volumes to make them empty. + try (FsDatasetSpi.FsVolumeReferences refs = + dnNode.getFSDataset().getFsVolumeReferences()) { + assertEquals(4, refs.size()); + for (int i = 0; i < refs.size(); i += 2) { + FsVolumeImpl source = (FsVolumeImpl) refs.get(i); + FsVolumeImpl dest = (FsVolumeImpl) refs.get(i + 1); + assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0); + DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(), + source, dest); + assertTrue(DiskBalancerTestUtil.getBlockCount(source) == 0); + } + } + + cluster.restartDataNodes(); + cluster.waitActive(); + + // Start up a disk balancer and read the cluster info. + final DataNode dataNode = cluster.getDataNodes().get(0); + ClusterConnector nameNodeConnector = + ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf); + + DiskBalancerCluster diskBalancerCluster = + new DiskBalancerCluster(nameNodeConnector); + diskBalancerCluster.readClusterInfo(); + List nodesToProcess = new LinkedList<>(); + // Rewrite the capacity in the model to show that disks need + // re-balancing. + setVolumeCapacity(diskBalancerCluster, CAP, "DISK"); + nodesToProcess.add(diskBalancerCluster.getNodeByUUID( + dataNode.getDatanodeUuid())); + diskBalancerCluster.setNodesToProcess(nodesToProcess); + + // Compute a plan. + List clusterPlan = diskBalancerCluster.computePlan(10.0f); + + NodePlan plan = clusterPlan.get(0); + assertEquals(2, plan.getVolumeSetPlans().size()); + plan.setNodeUUID(dnNode.getDatanodeUuid()); + plan.setTimeStamp(Time.now()); + String planJson = plan.toJson(); + String planID = DigestUtils.sha512Hex(planJson); + + dataNode.submitDiskBalancerPlan(planID, 1, planJson, false); + + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + return dataNode.queryDiskBalancerPlan().getResult() == + DiskBalancerWorkStatus.Result.PLAN_DONE; + } catch (IOException ex) { + return false; + } + } + }, 1000, 100000); + assertEquals(dataNode.queryDiskBalancerPlan().getResult(), + DiskBalancerWorkStatus.Result.PLAN_DONE); + + try (FsDatasetSpi.FsVolumeReferences refs = + dataNode.getFSDataset().getFsVolumeReferences()) { + for (FsVolumeSpi vol : refs) { + assertTrue(DiskBalancerTestUtil.getBlockCount(vol) > 0); + } + } + } finally { + cluster.shutdown(); + } + } + /** * Sets alll Disks capacity to size specified. *