HDFS-17037. Consider nonDfsUsed when running balancer. (#5715). Contributed by Shuyan Zhang.
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
parent
7bb09f1010
commit
9c989515ba
@ -75,7 +75,7 @@ public long getNonDfsUsed() {
|
||||
}
|
||||
|
||||
public long getRemaining() {
|
||||
return remaining;
|
||||
return Math.max(remaining, 0L);
|
||||
}
|
||||
|
||||
public long getBlockPoolUsed() {
|
||||
|
@ -104,21 +104,21 @@ void accumulateSpaces(DatanodeStorageReport r) {
|
||||
for(StorageReport s : r.getStorageReports()) {
|
||||
final StorageType t = s.getStorage().getStorageType();
|
||||
totalCapacities.add(t, s.getCapacity());
|
||||
totalUsedSpaces.add(t, s.getDfsUsed());
|
||||
totalUsedSpaces.add(t, s.getCapacity() - s.getRemaining());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
Double getUtilization(DatanodeStorageReport r, final StorageType t) {
|
||||
long capacity = 0L;
|
||||
long dfsUsed = 0L;
|
||||
long totalUsed = 0L;
|
||||
for(StorageReport s : r.getStorageReports()) {
|
||||
if (s.getStorage().getStorageType() == t) {
|
||||
capacity += s.getCapacity();
|
||||
dfsUsed += s.getDfsUsed();
|
||||
totalUsed += s.getCapacity() - s.getRemaining();
|
||||
}
|
||||
}
|
||||
return capacity == 0L? null: dfsUsed*100.0/capacity;
|
||||
return capacity == 0L ? null : totalUsed * 100.0 / capacity;
|
||||
}
|
||||
}
|
||||
|
||||
@ -138,7 +138,13 @@ String getName() {
|
||||
void accumulateSpaces(DatanodeStorageReport r) {
|
||||
for(StorageReport s : r.getStorageReports()) {
|
||||
final StorageType t = s.getStorage().getStorageType();
|
||||
totalCapacities.add(t, s.getCapacity());
|
||||
// Use s.getRemaining() + s.getBlockPoolUsed() instead of
|
||||
// s.getCapacity() here to avoid moving blocks towards nodes with
|
||||
// little actual available space.
|
||||
// The util is computed as blockPoolUsed/(remaining+blockPoolUsed),
|
||||
// which means nodes with more remaining space and less blockPoolUsed
|
||||
// will serve as the recipient during the balancing process.
|
||||
totalCapacities.add(t, s.getRemaining() + s.getBlockPoolUsed());
|
||||
totalUsedSpaces.add(t, s.getBlockPoolUsed());
|
||||
}
|
||||
}
|
||||
@ -149,11 +155,11 @@ Double getUtilization(DatanodeStorageReport r, final StorageType t) {
|
||||
long blockPoolUsed = 0L;
|
||||
for(StorageReport s : r.getStorageReports()) {
|
||||
if (s.getStorage().getStorageType() == t) {
|
||||
capacity += s.getCapacity();
|
||||
capacity += s.getRemaining() + s.getBlockPoolUsed();
|
||||
blockPoolUsed += s.getBlockPoolUsed();
|
||||
}
|
||||
}
|
||||
return capacity == 0L? null: blockPoolUsed*100.0/capacity;
|
||||
return capacity == 0L ? null : blockPoolUsed * 100.0 / capacity;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -41,6 +41,8 @@
|
||||
import java.lang.reflect.Field;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.junit.AfterClass;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset.CONFIG_PROPERTY_NONDFSUSED;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
@ -502,8 +504,9 @@ static void waitForBalancer(long totalUsedSpace, long totalCapacity,
|
||||
balanced = true;
|
||||
int actualExcludedNodeCount = 0;
|
||||
for (DatanodeInfo datanode : datanodeReport) {
|
||||
double nodeUtilization = ((double)datanode.getDfsUsed())
|
||||
/ datanode.getCapacity();
|
||||
double nodeUtilization =
|
||||
((double) datanode.getDfsUsed() + datanode.getNonDfsUsed()) /
|
||||
datanode.getCapacity();
|
||||
if (Dispatcher.Util.isExcluded(p.getExcludedNodes(), datanode)) {
|
||||
if (checkExcludeNodesUtilization) {
|
||||
assertTrue(nodeUtilization == 0);
|
||||
@ -641,7 +644,7 @@ private void doTest(Configuration conf, long[] capacities, String[] racks,
|
||||
private void doTest(Configuration conf, long[] capacities, String[] racks,
|
||||
long newCapacity, String newRack, NewNodeInfo nodes,
|
||||
boolean useTool, boolean useFile) throws Exception {
|
||||
doTest(conf, capacities, racks, newCapacity, newRack, nodes,
|
||||
doTest(conf, capacities, racks, newCapacity, 0L, newRack, nodes,
|
||||
useTool, useFile, false, 0.3);
|
||||
}
|
||||
|
||||
@ -666,8 +669,8 @@ private void doTest(Configuration conf, long[] capacities, String[] racks,
|
||||
* @throws Exception
|
||||
*/
|
||||
private void doTest(Configuration conf, long[] capacities,
|
||||
String[] racks, long newCapacity, String newRack, NewNodeInfo nodes,
|
||||
boolean useTool, boolean useFile,
|
||||
String[] racks, long newCapacity, long newNonDfsUsed, String newRack,
|
||||
NewNodeInfo nodes, boolean useTool, boolean useFile,
|
||||
boolean useNamesystemSpy, double clusterUtilization) throws Exception {
|
||||
LOG.info("capacities = " + long2String(capacities));
|
||||
LOG.info("racks = " + Arrays.asList(racks));
|
||||
@ -701,10 +704,11 @@ private void doTest(Configuration conf, long[] capacities,
|
||||
long totalCapacity = sum(capacities);
|
||||
|
||||
// fill up the cluster to be `clusterUtilization` full
|
||||
long totalUsedSpace = (long) (totalCapacity * clusterUtilization);
|
||||
createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
|
||||
long totalDfsUsedSpace = (long) (totalCapacity * clusterUtilization);
|
||||
createFile(cluster, filePath, totalDfsUsedSpace / numOfDatanodes,
|
||||
(short) numOfDatanodes, 0);
|
||||
|
||||
conf.setLong(CONFIG_PROPERTY_NONDFSUSED, newNonDfsUsed);
|
||||
if (nodes == null) { // there is no specification of new nodes.
|
||||
// start up an empty node with the same capacity and on the same rack
|
||||
cluster.startDataNodes(conf, 1, true, null,
|
||||
@ -774,9 +778,11 @@ private void doTest(Configuration conf, long[] capacities,
|
||||
|
||||
// run balancer and validate results
|
||||
if (useTool) {
|
||||
runBalancerCli(conf, totalUsedSpace, totalCapacity, p, useFile, expectedExcludedNodes);
|
||||
runBalancerCli(conf, totalDfsUsedSpace, newNonDfsUsed,
|
||||
totalCapacity, p, useFile, expectedExcludedNodes);
|
||||
} else {
|
||||
runBalancer(conf, totalUsedSpace, totalCapacity, p, expectedExcludedNodes);
|
||||
runBalancer(conf, totalDfsUsedSpace, newNonDfsUsed,
|
||||
totalCapacity, p, expectedExcludedNodes, true);
|
||||
}
|
||||
} finally {
|
||||
if(cluster != null) {
|
||||
@ -791,16 +797,18 @@ private void runBalancer(Configuration conf, long totalUsedSpace,
|
||||
BalancerParameters.DEFAULT, 0);
|
||||
}
|
||||
|
||||
private void runBalancer(Configuration conf, long totalUsedSpace,
|
||||
private void runBalancer(Configuration conf, long totalDfsUsedSpace,
|
||||
long totalCapacity, BalancerParameters p, int excludedNodes)
|
||||
throws Exception {
|
||||
runBalancer(conf, totalUsedSpace, totalCapacity, p, excludedNodes, true);
|
||||
runBalancer(conf, totalDfsUsedSpace, 0, totalCapacity, p, excludedNodes,
|
||||
true);
|
||||
}
|
||||
|
||||
private void runBalancer(Configuration conf, long totalUsedSpace,
|
||||
long totalCapacity, BalancerParameters p, int excludedNodes,
|
||||
boolean checkExcludeNodesUtilization) throws Exception {
|
||||
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
||||
private void runBalancer(Configuration conf, long totalDfsUsedSpace,
|
||||
long totalNonDfsUsedSpace, long totalCapacity, BalancerParameters p,
|
||||
int excludedNodes, boolean checkExcludeNodesUtilization)
|
||||
throws Exception {
|
||||
waitForHeartBeat(totalDfsUsedSpace, totalCapacity, client, cluster);
|
||||
|
||||
int retry = 5;
|
||||
while (retry > 0) {
|
||||
@ -816,9 +824,10 @@ private void runBalancer(Configuration conf, long totalUsedSpace,
|
||||
} else {
|
||||
assertEquals(ExitStatus.SUCCESS.getExitCode(), run);
|
||||
}
|
||||
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
||||
waitForHeartBeat(totalDfsUsedSpace, totalCapacity, client, cluster);
|
||||
LOG.info(" .");
|
||||
try {
|
||||
long totalUsedSpace = totalDfsUsedSpace + totalNonDfsUsedSpace;
|
||||
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p,
|
||||
excludedNodes, checkExcludeNodesUtilization);
|
||||
} catch (TimeoutException e) {
|
||||
@ -892,10 +901,10 @@ private static int runBalancer(Collection<URI> namenodes,
|
||||
return ExitStatus.SUCCESS.getExitCode();
|
||||
}
|
||||
|
||||
private void runBalancerCli(Configuration conf, long totalUsedSpace,
|
||||
long totalCapacity, BalancerParameters p, boolean useFile,
|
||||
int expectedExcludedNodes) throws Exception {
|
||||
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
||||
private void runBalancerCli(Configuration conf, long totalDfsUsedSpace,
|
||||
long totalNonDfsUsedSpace, long totalCapacity, BalancerParameters p,
|
||||
boolean useFile, int expectedExcludedNodes) throws Exception {
|
||||
waitForHeartBeat(totalDfsUsedSpace, totalCapacity, client, cluster);
|
||||
List <String> args = new ArrayList<String>();
|
||||
args.add("-policy");
|
||||
args.add("datanode");
|
||||
@ -939,8 +948,9 @@ private void runBalancerCli(Configuration conf, long totalUsedSpace,
|
||||
final int r = tool.run(args.toArray(new String[0])); // start rebalancing
|
||||
|
||||
assertEquals("Tools should exit 0 on success", 0, r);
|
||||
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
||||
waitForHeartBeat(totalDfsUsedSpace, totalCapacity, client, cluster);
|
||||
LOG.info("Rebalancing with default ctor.");
|
||||
long totalUsedSpace = totalDfsUsedSpace + totalNonDfsUsedSpace;
|
||||
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, expectedExcludedNodes);
|
||||
|
||||
if (excludeHostsFile != null && excludeHostsFile.exists()) {
|
||||
@ -1112,6 +1122,16 @@ void testBalancer2Internal(Configuration conf) throws Exception {
|
||||
new String[]{RACK0, RACK1}, CAPACITY, RACK2);
|
||||
}
|
||||
|
||||
/** Test a cluster with even distribution,
|
||||
* then a new node with nonDfsUsed is added to the cluster. */
|
||||
@Test(timeout=100000)
|
||||
public void testBalancer3() throws Exception {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
|
||||
CAPACITY, 1000L, RACK2, null, false, false, false, 0.3);
|
||||
}
|
||||
|
||||
private void testBalancerDefaultConstructor(Configuration conf,
|
||||
long[] capacities, String[] racks, long newCapacity, String newRack)
|
||||
throws Exception {
|
||||
@ -1504,10 +1524,11 @@ public void testBalancerDuringUpgrade() throws Exception {
|
||||
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
|
||||
|
||||
final int BLOCK_SIZE = 1024*1024;
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||
cluster = new MiniDFSCluster
|
||||
.Builder(conf)
|
||||
.numDataNodes(1)
|
||||
.storageCapacities(new long[] { BLOCK_SIZE * 10 })
|
||||
.simulatedCapacities(new long[]{BLOCK_SIZE * 10})
|
||||
.storageTypes(new StorageType[] { DEFAULT })
|
||||
.storagesPerDatanode(1)
|
||||
.build();
|
||||
@ -1517,11 +1538,12 @@ public void testBalancerDuringUpgrade() throws Exception {
|
||||
final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
|
||||
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
DFSTestUtil.createFile(fs, path1, BLOCK_SIZE, BLOCK_SIZE * 2, BLOCK_SIZE,
|
||||
DFSTestUtil.createFile(fs, path1, BLOCK_SIZE, BLOCK_SIZE * 3, BLOCK_SIZE,
|
||||
(short) 1, SEED);
|
||||
|
||||
// Add another DN with the same capacity, cluster is now unbalanced
|
||||
cluster.startDataNodes(conf, 1, true, null, null);
|
||||
cluster.startDataNodes(conf, 1, true, null, null, null,
|
||||
new long[]{BLOCK_SIZE * 10}, false);
|
||||
cluster.triggerHeartbeats();
|
||||
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
||||
|
||||
@ -1773,7 +1795,7 @@ private void doTestBalancerWithExcludeListWithStripedFile(Configuration conf) th
|
||||
pBuilder.setExcludedNodes(excludedList);
|
||||
|
||||
// start balancer and check the failed num of moving task
|
||||
runBalancer(conf, totalUsedSpace, totalCapacity, pBuilder.build(),
|
||||
runBalancer(conf, totalUsedSpace, 0, totalCapacity, pBuilder.build(),
|
||||
excludedList.size(), false);
|
||||
|
||||
// check total blocks, max wait time 60s
|
||||
@ -1891,7 +1913,7 @@ void testBalancerRPCDelay(int getBlocksMaxQps) throws Exception {
|
||||
capacities[i] = CAPACITY;
|
||||
racks[i] = (i < numDNs/2 ? RACK0 : RACK1);
|
||||
}
|
||||
doTest(conf, capacities, racks, CAPACITY, RACK2,
|
||||
doTest(conf, capacities, racks, CAPACITY, 0L, RACK2,
|
||||
// Use only 1 node and set the starting capacity to 50% to allow the
|
||||
// balancing to complete in only one iteration. This is necessary
|
||||
// because the startGetBlocksTime and endGetBlocksTime measures across
|
||||
|
@ -162,6 +162,11 @@ public static byte simulatedByte(Block b, long offsetInBlk) {
|
||||
private static final DatanodeStorage.State DEFAULT_STATE =
|
||||
DatanodeStorage.State.NORMAL;
|
||||
|
||||
public static final String CONFIG_PROPERTY_NONDFSUSED =
|
||||
"dfs.datanode.simulateddatastorage.nondfsused";
|
||||
|
||||
public static final long DEFAULT_NONDFSUSED = 0L;
|
||||
|
||||
static final byte[] nullCrcFileData;
|
||||
|
||||
private final DataNodeLockManager datasetLockManager;
|
||||
@ -467,11 +472,12 @@ static class SimulatedStorage {
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
private final long capacity; // in bytes
|
||||
private long nonDfsUsed;
|
||||
private final DatanodeStorage dnStorage;
|
||||
private final SimulatedVolume volume;
|
||||
|
||||
synchronized long getFree() {
|
||||
return capacity - getUsed();
|
||||
return capacity - getUsed() - getNonDfsUsed();
|
||||
}
|
||||
|
||||
long getCapacity() {
|
||||
@ -486,6 +492,10 @@ synchronized long getUsed() {
|
||||
return used;
|
||||
}
|
||||
|
||||
synchronized long getNonDfsUsed() {
|
||||
return nonDfsUsed;
|
||||
}
|
||||
|
||||
synchronized long getBlockPoolUsed(String bpid) throws IOException {
|
||||
return getBPStorage(bpid).getUsed();
|
||||
}
|
||||
@ -506,7 +516,7 @@ synchronized void free(String bpid, long amount) throws IOException {
|
||||
getBPStorage(bpid).free(amount);
|
||||
}
|
||||
|
||||
SimulatedStorage(long cap, DatanodeStorage.State state,
|
||||
SimulatedStorage(long cap, DatanodeStorage.State state, long nonDfsUsed,
|
||||
FileIoProvider fileIoProvider, Configuration conf) {
|
||||
capacity = cap;
|
||||
dnStorage = new DatanodeStorage(
|
||||
@ -515,6 +525,7 @@ synchronized void free(String bpid, long amount) throws IOException {
|
||||
DataNodeVolumeMetrics volumeMetrics =
|
||||
DataNodeVolumeMetrics.create(conf, dnStorage.getStorageID());
|
||||
this.volume = new SimulatedVolume(this, fileIoProvider, volumeMetrics);
|
||||
this.nonDfsUsed = nonDfsUsed;
|
||||
}
|
||||
|
||||
synchronized void addBlockPool(String bpid) {
|
||||
@ -548,7 +559,7 @@ DatanodeStorage getDnStorage() {
|
||||
synchronized StorageReport getStorageReport(String bpid) {
|
||||
return new StorageReport(dnStorage,
|
||||
false, getCapacity(), getUsed(), getFree(),
|
||||
map.get(bpid).getUsed(), 0L);
|
||||
map.get(bpid).getUsed(), getNonDfsUsed());
|
||||
}
|
||||
|
||||
SimulatedVolume getVolume() {
|
||||
@ -733,6 +744,7 @@ public SimulatedFSDataset(DataNode datanode, DataStorage storage, Configuration
|
||||
this.storages.add(new SimulatedStorage(
|
||||
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
|
||||
conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE),
|
||||
conf.getLong(CONFIG_PROPERTY_NONDFSUSED, DEFAULT_NONDFSUSED),
|
||||
fileIoProvider, conf));
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user