HDFS-10598. DiskBalancer does not execute multi-steps plan. Contributed by Lei (Eddy) Xu.
This commit is contained in:
parent
255ea45e50
commit
d84ab8a578
@ -486,11 +486,14 @@ private void executePlan() {
|
|||||||
public void run() {
|
public void run() {
|
||||||
Thread.currentThread().setName("DiskBalancerThread");
|
Thread.currentThread().setName("DiskBalancerThread");
|
||||||
LOG.info("Executing Disk balancer plan. Plan ID - " + planID);
|
LOG.info("Executing Disk balancer plan. Plan ID - " + planID);
|
||||||
|
try {
|
||||||
for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry :
|
for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry :
|
||||||
workMap.entrySet()) {
|
workMap.entrySet()) {
|
||||||
blockMover.copyBlocks(entry.getKey(), entry.getValue());
|
blockMover.copyBlocks(entry.getKey(), entry.getValue());
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
blockMover.setExitFlag();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -943,8 +946,7 @@ public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) {
|
|||||||
LOG.error("Exceeded the max error count. source {}, dest: {} " +
|
LOG.error("Exceeded the max error count. source {}, dest: {} " +
|
||||||
"error count: {}", source.getBasePath(),
|
"error count: {}", source.getBasePath(),
|
||||||
dest.getBasePath(), item.getErrorCount());
|
dest.getBasePath(), item.getErrorCount());
|
||||||
this.setExitFlag();
|
break;
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for the block tolerance constraint.
|
// Check for the block tolerance constraint.
|
||||||
@ -953,17 +955,15 @@ public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) {
|
|||||||
"blocks.",
|
"blocks.",
|
||||||
source.getBasePath(), dest.getBasePath(),
|
source.getBasePath(), dest.getBasePath(),
|
||||||
item.getBytesCopied(), item.getBlocksCopied());
|
item.getBytesCopied(), item.getBlocksCopied());
|
||||||
this.setExitFlag();
|
break;
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ExtendedBlock block = getNextBlock(poolIters, item);
|
ExtendedBlock block = getNextBlock(poolIters, item);
|
||||||
// we are not able to find any blocks to copy.
|
// we are not able to find any blocks to copy.
|
||||||
if (block == null) {
|
if (block == null) {
|
||||||
this.setExitFlag();
|
|
||||||
LOG.error("No source blocks, exiting the copy. Source: {}, " +
|
LOG.error("No source blocks, exiting the copy. Source: {}, " +
|
||||||
"dest:{}", source.getBasePath(), dest.getBasePath());
|
"dest:{}", source.getBasePath(), dest.getBasePath());
|
||||||
continue;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if someone told us exit, treat this as an interruption
|
// check if someone told us exit, treat this as an interruption
|
||||||
@ -971,7 +971,7 @@ public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) {
|
|||||||
// for the thread, since both getNextBlock and moveBlocAcrossVolume
|
// for the thread, since both getNextBlock and moveBlocAcrossVolume
|
||||||
// can take some time.
|
// can take some time.
|
||||||
if (!shouldRun()) {
|
if (!shouldRun()) {
|
||||||
continue;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
long timeUsed;
|
long timeUsed;
|
||||||
@ -990,8 +990,7 @@ public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) {
|
|||||||
LOG.error("Destination volume: {} does not have enough space to" +
|
LOG.error("Destination volume: {} does not have enough space to" +
|
||||||
" accommodate a block. Block Size: {} Exiting from" +
|
" accommodate a block. Block Size: {} Exiting from" +
|
||||||
" copyBlocks.", dest.getBasePath(), block.getNumBytes());
|
" copyBlocks.", dest.getBasePath(), block.getNumBytes());
|
||||||
this.setExitFlag();
|
break;
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.debug("Moved block with size {} from {} to {}",
|
LOG.debug("Moved block with size {} from {} to {}",
|
||||||
|
@ -25,12 +25,14 @@
|
|||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
|
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
|
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
|
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
|
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
|
||||||
@ -44,9 +46,11 @@
|
|||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
@ -190,7 +194,6 @@ public void testDiskBalancerEndToEnd() throws Exception {
|
|||||||
assertTrue(plan.getVolumeSetPlans().size() > 0);
|
assertTrue(plan.getVolumeSetPlans().size() > 0);
|
||||||
plan.getVolumeSetPlans().get(0).setTolerancePercent(10);
|
plan.getVolumeSetPlans().get(0).setTolerancePercent(10);
|
||||||
|
|
||||||
|
|
||||||
// Submit the plan and wait till the execution is done.
|
// Submit the plan and wait till the execution is done.
|
||||||
newDN.submitDiskBalancerPlan(planID, 1, planJson, false);
|
newDN.submitDiskBalancerPlan(planID, 1, planJson, false);
|
||||||
String jmxString = newDN.getDiskBalancerStatus();
|
String jmxString = newDN.getDiskBalancerStatus();
|
||||||
@ -237,6 +240,100 @@ public Boolean get() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testBalanceDataBetweenMultiplePairsOfVolumes()
|
||||||
|
throws Exception {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
final int DEFAULT_BLOCK_SIZE = 2048;
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
|
||||||
|
final int NUM_DATANODES = 1;
|
||||||
|
final long CAP = 512 * 1024;
|
||||||
|
final Path testFile = new Path("/testfile");
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(NUM_DATANODES)
|
||||||
|
.storageCapacities(new long[]{CAP, CAP, CAP, CAP})
|
||||||
|
.storagesPerDatanode(4)
|
||||||
|
.build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
TestBalancer.createFile(cluster, testFile, CAP, (short) 1, 0);
|
||||||
|
|
||||||
|
DFSTestUtil.waitReplication(fs, testFile, (short) 1);
|
||||||
|
DataNode dnNode = cluster.getDataNodes().get(0);
|
||||||
|
// Move data out of two volumes to make them empty.
|
||||||
|
try (FsDatasetSpi.FsVolumeReferences refs =
|
||||||
|
dnNode.getFSDataset().getFsVolumeReferences()) {
|
||||||
|
assertEquals(4, refs.size());
|
||||||
|
for (int i = 0; i < refs.size(); i += 2) {
|
||||||
|
FsVolumeImpl source = (FsVolumeImpl) refs.get(i);
|
||||||
|
FsVolumeImpl dest = (FsVolumeImpl) refs.get(i + 1);
|
||||||
|
assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0);
|
||||||
|
DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(),
|
||||||
|
source, dest);
|
||||||
|
assertTrue(DiskBalancerTestUtil.getBlockCount(source) == 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cluster.restartDataNodes();
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
// Start up a disk balancer and read the cluster info.
|
||||||
|
final DataNode dataNode = cluster.getDataNodes().get(0);
|
||||||
|
ClusterConnector nameNodeConnector =
|
||||||
|
ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf);
|
||||||
|
|
||||||
|
DiskBalancerCluster diskBalancerCluster =
|
||||||
|
new DiskBalancerCluster(nameNodeConnector);
|
||||||
|
diskBalancerCluster.readClusterInfo();
|
||||||
|
List<DiskBalancerDataNode> nodesToProcess = new LinkedList<>();
|
||||||
|
// Rewrite the capacity in the model to show that disks need
|
||||||
|
// re-balancing.
|
||||||
|
setVolumeCapacity(diskBalancerCluster, CAP, "DISK");
|
||||||
|
nodesToProcess.add(diskBalancerCluster.getNodeByUUID(
|
||||||
|
dataNode.getDatanodeUuid()));
|
||||||
|
diskBalancerCluster.setNodesToProcess(nodesToProcess);
|
||||||
|
|
||||||
|
// Compute a plan.
|
||||||
|
List<NodePlan> clusterPlan = diskBalancerCluster.computePlan(10.0f);
|
||||||
|
|
||||||
|
NodePlan plan = clusterPlan.get(0);
|
||||||
|
assertEquals(2, plan.getVolumeSetPlans().size());
|
||||||
|
plan.setNodeUUID(dnNode.getDatanodeUuid());
|
||||||
|
plan.setTimeStamp(Time.now());
|
||||||
|
String planJson = plan.toJson();
|
||||||
|
String planID = DigestUtils.sha512Hex(planJson);
|
||||||
|
|
||||||
|
dataNode.submitDiskBalancerPlan(planID, 1, planJson, false);
|
||||||
|
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
try {
|
||||||
|
return dataNode.queryDiskBalancerPlan().getResult() ==
|
||||||
|
DiskBalancerWorkStatus.Result.PLAN_DONE;
|
||||||
|
} catch (IOException ex) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, 1000, 100000);
|
||||||
|
assertEquals(dataNode.queryDiskBalancerPlan().getResult(),
|
||||||
|
DiskBalancerWorkStatus.Result.PLAN_DONE);
|
||||||
|
|
||||||
|
try (FsDatasetSpi.FsVolumeReferences refs =
|
||||||
|
dataNode.getFSDataset().getFsVolumeReferences()) {
|
||||||
|
for (FsVolumeSpi vol : refs) {
|
||||||
|
assertTrue(DiskBalancerTestUtil.getBlockCount(vol) > 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets alll Disks capacity to size specified.
|
* Sets alll Disks capacity to size specified.
|
||||||
*
|
*
|
||||||
|
Loading…
Reference in New Issue
Block a user