HDFS-15989. Split TestBalancer and De-flake testMaxIterationTime() (#2942)

Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
This commit is contained in:
Viraj Jasani 2021-04-23 09:27:23 +05:30 committed by GitHub
parent 2752156e3e
commit 96a1dfa313
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 627 additions and 498 deletions

View File

@ -19,7 +19,6 @@
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
import static org.apache.hadoop.fs.StorageType.DEFAULT;
import static org.apache.hadoop.fs.StorageType.RAM_DISK;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
@ -28,22 +27,16 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BALANCER_KEYTAB_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BALANCER_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.junit.AfterClass;
@ -56,10 +49,8 @@
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
@ -86,8 +77,6 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
@ -107,14 +96,9 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
@ -185,7 +169,6 @@ public void shutdown() throws Exception {
static final double CAPACITY_ALLOWED_VARIANCE = 0.005; // 0.5%
static final double BALANCE_ALLOWED_VARIANCE = 0.11; // 10%+delta
static final int DEFAULT_BLOCK_SIZE = 100;
static final int DEFAULT_RAM_DISK_BLOCK_SIZE = 5 * 1024 * 1024;
private static final Random r = new Random();
static {
@ -211,20 +194,6 @@ static void initConf(Configuration conf) {
conf.setInt(DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, 5*1000);
}
static void initConfWithRamDisk(Configuration conf,
long ramDiskCapacity) {
conf.setLong(DFS_BLOCK_SIZE_KEY, DEFAULT_RAM_DISK_BLOCK_SIZE);
conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, ramDiskCapacity);
conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC, 3);
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
conf.setInt(DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, 5*1000);
LazyPersistTestCase.initCacheManipulator();
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
}
private final ErasureCodingPolicy ecPolicy =
StripedFileTestUtil.getDefaultECPolicy();
private final int dataBlocks = ecPolicy.getNumDataUnits();
@ -485,160 +454,6 @@ static void waitForBalancer(long totalUsedSpace, long totalCapacity,
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, 0);
}
/**
* Make sure that balancer can't move pinned blocks.
* If specified favoredNodes when create file, blocks will be pinned use
* sticky bit.
* @throws Exception
*/
@Test(timeout=100000)
public void testBalancerWithPinnedBlocks() throws Exception {
// This test assumes stick-bit based block pin mechanism available only
// in Linux/Unix. It can be unblocked on Windows when HDFS-7759 is ready to
// provide a different mechanism for Windows.
assumeNotWindows();
final Configuration conf = new HdfsConfiguration();
initConf(conf);
conf.setBoolean(DFS_DATANODE_BLOCK_PINNING_ENABLED, true);
long[] capacities = new long[] { CAPACITY, CAPACITY };
String[] hosts = {"host0", "host1"};
String[] racks = { RACK0, RACK1 };
int numOfDatanodes = capacities.length;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
.hosts(hosts).racks(racks).simulatedCapacities(capacities).build();
cluster.waitActive();
client = NameNodeProxies.createProxy(conf,
cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
// fill up the cluster to be 80% full
long totalCapacity = sum(capacities);
long totalUsedSpace = totalCapacity * 8 / 10;
InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes];
for (int i = 0; i < favoredNodes.length; i++) {
// DFSClient will attempt reverse lookup. In case it resolves
// "127.0.0.1" to "localhost", we manually specify the hostname.
int port = cluster.getDataNodes().get(i).getXferAddress().getPort();
favoredNodes[i] = new InetSocketAddress(hosts[i], port);
}
DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024,
totalUsedSpace / numOfDatanodes, DEFAULT_BLOCK_SIZE,
(short) numOfDatanodes, 0, false, favoredNodes);
// start up an empty node with the same capacity
cluster.startDataNodes(conf, 1, true, null, new String[] { RACK2 },
new long[] { CAPACITY });
totalCapacity += CAPACITY;
// run balancer and validate results
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
// start rebalancing
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
}
/**
* Verify balancer won't violate the default block placement policy.
* @throws Exception
*/
@Test(timeout=100000)
public void testRackPolicyAfterBalance() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
long[] capacities = new long[] { CAPACITY, CAPACITY };
String[] hosts = {"host0", "host1"};
String[] racks = { RACK0, RACK1 };
runBalancerAndVerifyBlockPlacmentPolicy(conf, capacities, hosts, racks,
null, CAPACITY, "host2", RACK1, null);
}
/**
* Verify balancer won't violate upgrade domain block placement policy.
* @throws Exception
*/
@Test(timeout=100000)
public void testUpgradeDomainPolicyAfterBalance() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
BlockPlacementPolicyWithUpgradeDomain.class,
BlockPlacementPolicy.class);
long[] capacities = new long[] { CAPACITY, CAPACITY, CAPACITY };
String[] hosts = {"host0", "host1", "host2"};
String[] racks = { RACK0, RACK1, RACK1 };
String[] UDs = { "ud0", "ud1", "ud2" };
runBalancerAndVerifyBlockPlacmentPolicy(conf, capacities, hosts, racks,
UDs, CAPACITY, "host3", RACK2, "ud2");
}
private void runBalancerAndVerifyBlockPlacmentPolicy(Configuration conf,
long[] capacities, String[] hosts, String[] racks, String[] UDs,
long newCapacity, String newHost, String newRack, String newUD)
throws Exception {
int numOfDatanodes = capacities.length;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
.hosts(hosts).racks(racks).simulatedCapacities(capacities).build();
DatanodeManager dm = cluster.getNamesystem().getBlockManager().
getDatanodeManager();
if (UDs != null) {
for(int i = 0; i < UDs.length; i++) {
DatanodeID datanodeId = cluster.getDataNodes().get(i).getDatanodeId();
dm.getDatanode(datanodeId).setUpgradeDomain(UDs[i]);
}
}
try {
cluster.waitActive();
client = NameNodeProxies.createProxy(conf,
cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
// fill up the cluster to be 80% full
long totalCapacity = sum(capacities);
long totalUsedSpace = totalCapacity * 8 / 10;
final long fileSize = totalUsedSpace / numOfDatanodes;
DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024,
fileSize, DEFAULT_BLOCK_SIZE, (short) numOfDatanodes, 0, false);
// start up an empty node with the same capacity on the same rack as the
// pinned host.
cluster.startDataNodes(conf, 1, true, null, new String[] { newRack },
new String[] { newHost }, new long[] { newCapacity });
if (newUD != null) {
DatanodeID newId = cluster.getDataNodes().get(
numOfDatanodes).getDatanodeId();
dm.getDatanode(newId).setUpgradeDomain(newUD);
}
totalCapacity += newCapacity;
// run balancer and validate results
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
// start rebalancing
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
BlockPlacementPolicy placementPolicy =
cluster.getNamesystem().getBlockManager().getBlockPlacementPolicy();
List<LocatedBlock> locatedBlocks = client.
getBlockLocations(fileName, 0, fileSize).getLocatedBlocks();
for (LocatedBlock locatedBlock : locatedBlocks) {
BlockPlacementStatus status = placementPolicy.verifyBlockPlacement(
locatedBlock.getLocations(), numOfDatanodes);
assertTrue(status.isPlacementPolicySatisfied());
}
} finally {
cluster.shutdown();
}
}
/**
* Wait until balanced: each datanode gives utilization within
* BALANCE_ALLOWED_VARIANCE of average
@ -1598,144 +1413,6 @@ public void testBalancerCliWithIncludeListWithPortsInAFile() throws Exception {
CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true);
}
@Test(timeout = 100000)
public void testMaxIterationTime() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
int blockSize = 10*1024*1024; // 10MB block size
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
// limit the worker thread count of Balancer to have only 1 queue per DN
conf.setInt(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY, 1);
// limit the bandwidth to 4MB per sec to emulate slow block moves
conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
4 * 1024 * 1024);
// set client socket timeout to have an IN_PROGRESS notification back from
// the DataNode about the copy in every second.
conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 2000L);
// set max iteration time to 2 seconds to timeout before moving any block
conf.setLong(DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY, 2000L);
// setup the cluster
final long capacity = 10L * blockSize;
final long[] dnCapacities = new long[] {capacity, capacity};
final short rep = 1;
final long seed = 0xFAFAFA;
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(0)
.build();
try {
cluster.getConfiguration(0).setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
cluster.startDataNodes(conf, 1, true, null, null, dnCapacities);
cluster.waitClusterUp();
cluster.waitActive();
final Path path = new Path("/testMaxIterationTime.dat");
DistributedFileSystem fs = cluster.getFileSystem();
// fill the DN to 40%
DFSTestUtil.createFile(fs, path, 4L * blockSize, rep, seed);
// start a new DN
cluster.startDataNodes(conf, 1, true, null, null, dnCapacities);
cluster.triggerHeartbeats();
// setup Balancer and run one iteration
List<NameNodeConnector> connectors = Collections.emptyList();
try {
BalancerParameters bParams = BalancerParameters.DEFAULT;
// set maxIdleIterations to 1 for NO_MOVE_PROGRESS to be
// reported when there is no block move
connectors = NameNodeConnector.newNameNodeConnectors(
DFSUtil.getInternalNsRpcUris(conf), Balancer.class.getSimpleName(),
Balancer.BALANCER_ID_PATH, conf, 1);
for (NameNodeConnector nnc : connectors) {
LOG.info("NNC to work on: " + nnc);
Balancer b = new Balancer(nnc, bParams, conf);
Result r = b.runOneIteration();
// Since no block cannot be moved in 2 seconds (i.e.,
// 4MB/s * 2s = 8MB < 10MB), NO_MOVE_PROGRESS will be reported.
// When a block move is not canceled in 2 seconds properly and then
// a block is moved unexpectedly, IN_PROGRESS will be reported.
assertEquals("We expect ExitStatus.NO_MOVE_PROGRESS to be reported.",
ExitStatus.NO_MOVE_PROGRESS, r.getExitStatus());
assertEquals(0, r.getBlocksMoved());
}
} finally {
for (NameNodeConnector nnc : connectors) {
IOUtils.cleanupWithLogger(null, nnc);
}
}
} finally {
cluster.shutdown(true, true);
}
}
/*
* Test Balancer with Ram_Disk configured
* One DN has two files on RAM_DISK, other DN has no files on RAM_DISK.
* Then verify that the balancer does not migrate files on RAM_DISK across DN.
*/
@Test(timeout=300000)
public void testBalancerWithRamDisk() throws Exception {
final int SEED = 0xFADED;
final short REPL_FACT = 1;
Configuration conf = new Configuration();
final int defaultRamDiskCapacity = 10;
final long ramDiskStorageLimit =
((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) +
(DEFAULT_RAM_DISK_BLOCK_SIZE - 1);
final long diskStorageLimit =
((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) +
(DEFAULT_RAM_DISK_BLOCK_SIZE - 1);
initConfWithRamDisk(conf, ramDiskStorageLimit);
cluster = new MiniDFSCluster
.Builder(conf)
.numDataNodes(1)
.storageCapacities(new long[] { ramDiskStorageLimit, diskStorageLimit })
.storageTypes(new StorageType[] { RAM_DISK, DEFAULT })
.build();
cluster.waitActive();
// Create few files on RAM_DISK
final String METHOD_NAME = GenericTestUtils.getMethodName();
final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
final Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
DistributedFileSystem fs = cluster.getFileSystem();
DFSClient client = fs.getClient();
DFSTestUtil.createFile(fs, path1, true,
DEFAULT_RAM_DISK_BLOCK_SIZE, 4 * DEFAULT_RAM_DISK_BLOCK_SIZE,
DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
DFSTestUtil.createFile(fs, path2, true,
DEFAULT_RAM_DISK_BLOCK_SIZE, 1 * DEFAULT_RAM_DISK_BLOCK_SIZE,
DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
// Sleep for a short time to allow the lazy writer thread to do its job
Thread.sleep(6 * 1000);
// Add another fresh DN with the same type/capacity without files on RAM_DISK
StorageType[][] storageTypes = new StorageType[][] {{RAM_DISK, DEFAULT}};
long[][] storageCapacities = new long[][]{{ramDiskStorageLimit,
diskStorageLimit}};
cluster.startDataNodes(conf, REPL_FACT, storageTypes, true, null,
null, null, storageCapacities, null, false, false, false, null);
cluster.triggerHeartbeats();
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
// Run Balancer
final BalancerParameters p = BalancerParameters.DEFAULT;
final int r = Balancer.run(namenodes, p, conf);
// Validate no RAM_DISK block should be moved
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
// Verify files are still on RAM_DISK
DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path1, RAM_DISK);
DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path2, RAM_DISK);
}
/**
* Check that the balancer exits when there is an unfinalized upgrade.
*/
@ -1798,66 +1475,6 @@ public void testBalancerDuringUpgrade() throws Exception {
Balancer.run(namenodes, p, conf));
}
/**
* Test special case. Two replicas belong to same block should not in same node.
* We have 2 nodes.
* We have a block in (DN0,SSD) and (DN1,DISK).
* Replica in (DN0,SSD) should not be moved to (DN1,SSD).
* Otherwise DN1 has 2 replicas.
*/
@Test(timeout=100000)
public void testTwoReplicaShouldNotInSameDN() throws Exception {
final Configuration conf = new HdfsConfiguration();
int blockSize = 5 * 1024 * 1024 ;
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
1L);
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
int numOfDatanodes =2;
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2)
.racks(new String[]{"/default/rack0", "/default/rack0"})
.storagesPerDatanode(2)
.storageTypes(new StorageType[][]{
{StorageType.SSD, StorageType.DISK},
{StorageType.SSD, StorageType.DISK}})
.storageCapacities(new long[][]{
{100 * blockSize, 20 * blockSize},
{20 * blockSize, 100 * blockSize}})
.build();
cluster.waitActive();
//set "/bar" directory with ONE_SSD storage policy.
DistributedFileSystem fs = cluster.getFileSystem();
Path barDir = new Path("/bar");
fs.mkdir(barDir,new FsPermission((short)777));
fs.setStoragePolicy(barDir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
// Insert 30 blocks. So (DN0,SSD) and (DN1,DISK) are about half full,
// and (DN0,SSD) and (DN1,DISK) are about 15% full.
long fileLen = 30 * blockSize;
// fooFile has ONE_SSD policy. So
// (DN0,SSD) and (DN1,DISK) have 2 replicas belong to same block.
// (DN0,DISK) and (DN1,SSD) have 2 replicas belong to same block.
Path fooFile = new Path(barDir, "foo");
createFile(cluster, fooFile, fileLen, (short) numOfDatanodes, 0);
// update space info
cluster.triggerHeartbeats();
BalancerParameters p = BalancerParameters.DEFAULT;
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
final int r = Balancer.run(namenodes, p, conf);
// Replica in (DN0,SSD) was not moved to (DN1,SSD), because (DN1,DISK)
// already has one. Otherwise DN1 will have 2 replicas.
// For same reason, no replicas were moved.
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
}
/**
* Test running many balancer simultaneously.
*
@ -1929,121 +1546,6 @@ public void testManyBalancerSimultaneously() throws Exception {
ExitStatus.SUCCESS.getExitCode(), exitCode);
}
/** Balancer should not move blocks with size < minBlockSize. */
@Test(timeout=60000)
public void testMinBlockSizeAndSourceNodes() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
final short replication = 3;
final long[] lengths = {10, 10, 10, 10};
final long[] capacities = new long[replication];
final long totalUsed = capacities.length * sum(lengths);
Arrays.fill(capacities, 1000);
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(capacities.length)
.simulatedCapacities(capacities)
.build();
final DistributedFileSystem dfs = cluster.getFileSystem();
cluster.waitActive();
client = NameNodeProxies.createProxy(conf, dfs.getUri(),
ClientProtocol.class).getProxy();
// fill up the cluster to be 80% full
for(int i = 0; i < lengths.length; i++) {
final long size = lengths[i];
final Path p = new Path("/file" + i + "_size" + size);
try(OutputStream out = dfs.create(p)) {
for(int j = 0; j < size; j++) {
out.write(j);
}
}
}
// start up an empty node with the same capacity
cluster.startDataNodes(conf, capacities.length, true, null, null, capacities);
LOG.info("capacities = " + Arrays.toString(capacities));
LOG.info("totalUsedSpace= " + totalUsed);
LOG.info("lengths = " + Arrays.toString(lengths) + ", #=" + lengths.length);
waitForHeartBeat(totalUsed, 2*capacities[0]*capacities.length, client, cluster);
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
{ // run Balancer with min-block-size=50
final BalancerParameters p = Balancer.Cli.parse(new String[] {
"-policy", BalancingPolicy.Node.INSTANCE.getName(),
"-threshold", "1"
});
assertEquals(p.getBalancingPolicy(), BalancingPolicy.Node.INSTANCE);
assertEquals(p.getThreshold(), 1.0, 0.001);
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
final int r = Balancer.run(namenodes, p, conf);
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
}
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
{ // run Balancer with empty nodes as source nodes
final Set<String> sourceNodes = new HashSet<>();
final List<DataNode> datanodes = cluster.getDataNodes();
for(int i = capacities.length; i < datanodes.size(); i++) {
sourceNodes.add(datanodes.get(i).getDisplayName());
}
final BalancerParameters p = Balancer.Cli.parse(new String[] {
"-policy", BalancingPolicy.Node.INSTANCE.getName(),
"-threshold", "1",
"-source", StringUtils.join(sourceNodes, ',')
});
assertEquals(p.getBalancingPolicy(), BalancingPolicy.Node.INSTANCE);
assertEquals(p.getThreshold(), 1.0, 0.001);
assertEquals(p.getSourceNodes(), sourceNodes);
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
final int r = Balancer.run(namenodes, p, conf);
assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
}
{ // run Balancer with a filled node as a source node
final Set<String> sourceNodes = new HashSet<>();
final List<DataNode> datanodes = cluster.getDataNodes();
sourceNodes.add(datanodes.get(0).getDisplayName());
final BalancerParameters p = Balancer.Cli.parse(new String[] {
"-policy", BalancingPolicy.Node.INSTANCE.getName(),
"-threshold", "1",
"-source", StringUtils.join(sourceNodes, ',')
});
assertEquals(p.getBalancingPolicy(), BalancingPolicy.Node.INSTANCE);
assertEquals(p.getThreshold(), 1.0, 0.001);
assertEquals(p.getSourceNodes(), sourceNodes);
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
final int r = Balancer.run(namenodes, p, conf);
assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
}
{ // run Balancer with all filled node as source nodes
final Set<String> sourceNodes = new HashSet<>();
final List<DataNode> datanodes = cluster.getDataNodes();
for(int i = 0; i < capacities.length; i++) {
sourceNodes.add(datanodes.get(i).getDisplayName());
}
final BalancerParameters p = Balancer.Cli.parse(new String[] {
"-policy", BalancingPolicy.Node.INSTANCE.getName(),
"-threshold", "1",
"-source", StringUtils.join(sourceNodes, ',')
});
assertEquals(p.getBalancingPolicy(), BalancingPolicy.Node.INSTANCE);
assertEquals(p.getThreshold(), 1.0, 0.001);
assertEquals(p.getSourceNodes(), sourceNodes);
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
final int r = Balancer.run(namenodes, p, conf);
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
}
}
public void integrationTestWithStripedFile(Configuration conf) throws Exception {
initConfWithStripe(conf);
doTestBalancerWithStripedFile(conf);

View File

@ -0,0 +1,627 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.balancer;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.apache.hadoop.fs.StorageType.DEFAULT;
import static org.apache.hadoop.fs.StorageType.RAM_DISK;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Some long running Balancer tasks.
*/
public class TestBalancerLongRunningTasks {
private static final Logger LOG =
LoggerFactory.getLogger(TestBalancerLongRunningTasks.class);
static {
GenericTestUtils.setLogLevel(Balancer.LOG, Level.TRACE);
GenericTestUtils.setLogLevel(Dispatcher.LOG, Level.DEBUG);
}
private final static long CAPACITY = 5000L;
private final static String RACK0 = "/rack0";
private final static String RACK1 = "/rack1";
private final static String RACK2 = "/rack2";
private final static String FILE_NAME = "/tmp.txt";
private final static Path FILE_PATH = new Path(FILE_NAME);
private MiniDFSCluster cluster;
@After
public void shutdown() throws Exception {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
private ClientProtocol client;
static final int DEFAULT_BLOCK_SIZE = 100;
static final int DEFAULT_RAM_DISK_BLOCK_SIZE = 5 * 1024 * 1024;
static {
initTestSetup();
}
public static void initTestSetup() {
// do not create id file since it occupies the disk space
NameNodeConnector.setWrite2IdFile(false);
}
static void initConf(Configuration conf) {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
1L);
SimulatedFSDataset.setFactory(conf);
conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
conf.setInt(DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, 5 * 1000);
}
static void initConfWithRamDisk(Configuration conf,
long ramDiskCapacity) {
conf.setLong(DFS_BLOCK_SIZE_KEY, DEFAULT_RAM_DISK_BLOCK_SIZE);
conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, ramDiskCapacity);
conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC, 3);
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
conf.setInt(DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, 5 * 1000);
LazyPersistTestCase.initCacheManipulator();
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
}
/**
* Test special case. Two replicas belong to same block should not in same
* node.
* We have 2 nodes.
* We have a block in (DN0,SSD) and (DN1,DISK).
* Replica in (DN0,SSD) should not be moved to (DN1,SSD).
* Otherwise DN1 has 2 replicas.
*/
@Test(timeout = 100000)
public void testTwoReplicaShouldNotInSameDN() throws Exception {
final Configuration conf = new HdfsConfiguration();
int blockSize = 5 * 1024 * 1024;
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
1L);
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
int numOfDatanodes = 2;
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2)
.racks(new String[]{"/default/rack0", "/default/rack0"})
.storagesPerDatanode(2)
.storageTypes(new StorageType[][]{
{StorageType.SSD, StorageType.DISK},
{StorageType.SSD, StorageType.DISK}})
.storageCapacities(new long[][]{
{100 * blockSize, 20 * blockSize},
{20 * blockSize, 100 * blockSize}})
.build();
cluster.waitActive();
//set "/bar" directory with ONE_SSD storage policy.
DistributedFileSystem fs = cluster.getFileSystem();
Path barDir = new Path("/bar");
fs.mkdir(barDir, new FsPermission((short) 777));
fs.setStoragePolicy(barDir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
// Insert 30 blocks. So (DN0,SSD) and (DN1,DISK) are about half full,
// and (DN0,SSD) and (DN1,DISK) are about 15% full.
long fileLen = 30 * blockSize;
// fooFile has ONE_SSD policy. So
// (DN0,SSD) and (DN1,DISK) have 2 replicas belong to same block.
// (DN0,DISK) and (DN1,SSD) have 2 replicas belong to same block.
Path fooFile = new Path(barDir, "foo");
TestBalancer.createFile(cluster, fooFile, fileLen, (short) numOfDatanodes,
0);
// update space info
cluster.triggerHeartbeats();
BalancerParameters p = BalancerParameters.DEFAULT;
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
final int r = Balancer.run(namenodes, p, conf);
// Replica in (DN0,SSD) was not moved to (DN1,SSD), because (DN1,DISK)
// already has one. Otherwise DN1 will have 2 replicas.
// For same reason, no replicas were moved.
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
}
/*
* Test Balancer with Ram_Disk configured
* One DN has two files on RAM_DISK, other DN has no files on RAM_DISK.
* Then verify that the balancer does not migrate files on RAM_DISK across DN.
*/
@Test(timeout = 300000)
public void testBalancerWithRamDisk() throws Exception {
final int seed = 0xFADED;
final short replicationFactor = 1;
Configuration conf = new Configuration();
final int defaultRamDiskCapacity = 10;
final long ramDiskStorageLimit =
((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) +
(DEFAULT_RAM_DISK_BLOCK_SIZE - 1);
final long diskStorageLimit =
((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) +
(DEFAULT_RAM_DISK_BLOCK_SIZE - 1);
initConfWithRamDisk(conf, ramDiskStorageLimit);
cluster = new MiniDFSCluster
.Builder(conf)
.numDataNodes(1)
.storageCapacities(new long[]{ramDiskStorageLimit, diskStorageLimit})
.storageTypes(new StorageType[]{RAM_DISK, DEFAULT})
.build();
cluster.waitActive();
// Create few files on RAM_DISK
final String methodName = GenericTestUtils.getMethodName();
final Path path1 = new Path("/" + methodName + ".01.dat");
final Path path2 = new Path("/" + methodName + ".02.dat");
DistributedFileSystem fs = cluster.getFileSystem();
DFSClient dfsClient = fs.getClient();
DFSTestUtil.createFile(fs, path1, true,
DEFAULT_RAM_DISK_BLOCK_SIZE, 4 * DEFAULT_RAM_DISK_BLOCK_SIZE,
DEFAULT_RAM_DISK_BLOCK_SIZE, replicationFactor, seed, true);
DFSTestUtil.createFile(fs, path2, true,
DEFAULT_RAM_DISK_BLOCK_SIZE, 1 * DEFAULT_RAM_DISK_BLOCK_SIZE,
DEFAULT_RAM_DISK_BLOCK_SIZE, replicationFactor, seed, true);
// Sleep for a short time to allow the lazy writer thread to do its job
Thread.sleep(6 * 1000);
// Add another fresh DN with the same type/capacity without files on
// RAM_DISK
StorageType[][] storageTypes = new StorageType[][]{{RAM_DISK, DEFAULT}};
long[][] storageCapacities = new long[][]{{ramDiskStorageLimit,
diskStorageLimit}};
cluster.startDataNodes(conf, replicationFactor, storageTypes, true, null,
null, null, storageCapacities, null, false, false, false, null);
cluster.triggerHeartbeats();
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
// Run Balancer
final BalancerParameters p = BalancerParameters.DEFAULT;
final int r = Balancer.run(namenodes, p, conf);
// Validate no RAM_DISK block should be moved
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
// Verify files are still on RAM_DISK
DFSTestUtil.verifyFileReplicasOnStorageType(fs, dfsClient, path1, RAM_DISK);
DFSTestUtil.verifyFileReplicasOnStorageType(fs, dfsClient, path2, RAM_DISK);
}
/**
* Balancer should not move blocks with size < minBlockSize.
*/
@Test(timeout = 60000)
public void testMinBlockSizeAndSourceNodes() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
final short replication = 3;
final long[] lengths = {10, 10, 10, 10};
final long[] capacities = new long[replication];
final long totalUsed = capacities.length * TestBalancer.sum(lengths);
Arrays.fill(capacities, 1000);
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(capacities.length)
.simulatedCapacities(capacities)
.build();
final DistributedFileSystem dfs = cluster.getFileSystem();
cluster.waitActive();
client = NameNodeProxies.createProxy(conf, dfs.getUri(),
ClientProtocol.class).getProxy();
// fill up the cluster to be 80% full
for (int i = 0; i < lengths.length; i++) {
final long size = lengths[i];
final Path p = new Path("/file" + i + "_size" + size);
try (OutputStream out = dfs.create(p)) {
for (int j = 0; j < size; j++) {
out.write(j);
}
}
}
// start up an empty node with the same capacity
cluster.startDataNodes(conf, capacities.length, true, null, null, capacities);
LOG.info("capacities = " + Arrays.toString(capacities));
LOG.info("totalUsedSpace= " + totalUsed);
LOG.info("lengths = " + Arrays.toString(lengths) + ", #=" + lengths.length);
TestBalancer.waitForHeartBeat(totalUsed,
2 * capacities[0] * capacities.length, client, cluster);
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
{ // run Balancer with min-block-size=50
final BalancerParameters p = Balancer.Cli.parse(new String[]{
"-policy", BalancingPolicy.Node.INSTANCE.getName(),
"-threshold", "1"
});
assertEquals(p.getBalancingPolicy(), BalancingPolicy.Node.INSTANCE);
assertEquals(p.getThreshold(), 1.0, 0.001);
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
final int r = Balancer.run(namenodes, p, conf);
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
}
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
{ // run Balancer with empty nodes as source nodes
final Set<String> sourceNodes = new HashSet<>();
final List<DataNode> datanodes = cluster.getDataNodes();
for (int i = capacities.length; i < datanodes.size(); i++) {
sourceNodes.add(datanodes.get(i).getDisplayName());
}
final BalancerParameters p = Balancer.Cli.parse(new String[]{
"-policy", BalancingPolicy.Node.INSTANCE.getName(),
"-threshold", "1",
"-source", StringUtils.join(sourceNodes, ',')
});
assertEquals(p.getBalancingPolicy(), BalancingPolicy.Node.INSTANCE);
assertEquals(p.getThreshold(), 1.0, 0.001);
assertEquals(p.getSourceNodes(), sourceNodes);
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
final int r = Balancer.run(namenodes, p, conf);
assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
}
{ // run Balancer with a filled node as a source node
final Set<String> sourceNodes = new HashSet<>();
final List<DataNode> datanodes = cluster.getDataNodes();
sourceNodes.add(datanodes.get(0).getDisplayName());
final BalancerParameters p = Balancer.Cli.parse(new String[]{
"-policy", BalancingPolicy.Node.INSTANCE.getName(),
"-threshold", "1",
"-source", StringUtils.join(sourceNodes, ',')
});
assertEquals(p.getBalancingPolicy(), BalancingPolicy.Node.INSTANCE);
assertEquals(p.getThreshold(), 1.0, 0.001);
assertEquals(p.getSourceNodes(), sourceNodes);
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
final int r = Balancer.run(namenodes, p, conf);
assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
}
{ // run Balancer with all filled node as source nodes
final Set<String> sourceNodes = new HashSet<>();
final List<DataNode> datanodes = cluster.getDataNodes();
for (int i = 0; i < capacities.length; i++) {
sourceNodes.add(datanodes.get(i).getDisplayName());
}
final BalancerParameters p = Balancer.Cli.parse(new String[]{
"-policy", BalancingPolicy.Node.INSTANCE.getName(),
"-threshold", "1",
"-source", StringUtils.join(sourceNodes, ',')
});
assertEquals(p.getBalancingPolicy(), BalancingPolicy.Node.INSTANCE);
assertEquals(p.getThreshold(), 1.0, 0.001);
assertEquals(p.getSourceNodes(), sourceNodes);
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
final int r = Balancer.run(namenodes, p, conf);
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
}
}
/**
* Verify balancer won't violate upgrade domain block placement policy.
*
* @throws Exception
*/
@Test(timeout = 100000)
public void testUpgradeDomainPolicyAfterBalance() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
BlockPlacementPolicyWithUpgradeDomain.class,
BlockPlacementPolicy.class);
long[] capacities = new long[]{CAPACITY, CAPACITY, CAPACITY};
String[] hosts = {"host0", "host1", "host2"};
String[] racks = {RACK0, RACK1, RACK1};
String[] uds = {"ud0", "ud1", "ud2"};
runBalancerAndVerifyBlockPlacmentPolicy(conf, capacities, hosts, racks,
uds, CAPACITY, "host3", RACK2, "ud2");
}
/**
* Verify balancer won't violate the default block placement policy.
*
* @throws Exception
*/
@Test(timeout = 100000)
public void testRackPolicyAfterBalance() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
long[] capacities = new long[]{CAPACITY, CAPACITY};
String[] hosts = {"host0", "host1"};
String[] racks = {RACK0, RACK1};
runBalancerAndVerifyBlockPlacmentPolicy(conf, capacities, hosts, racks,
null, CAPACITY, "host2", RACK1, null);
}
private void runBalancerAndVerifyBlockPlacmentPolicy(Configuration conf,
long[] capacities, String[] hosts, String[] racks, String[] UDs,
long newCapacity, String newHost, String newRack, String newUD)
throws Exception {
int numOfDatanodes = capacities.length;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
.hosts(hosts).racks(racks).simulatedCapacities(capacities).build();
DatanodeManager dm = cluster.getNamesystem().getBlockManager().
getDatanodeManager();
if (UDs != null) {
for (int i = 0; i < UDs.length; i++) {
DatanodeID datanodeId = cluster.getDataNodes().get(i).getDatanodeId();
dm.getDatanode(datanodeId).setUpgradeDomain(UDs[i]);
}
}
try {
cluster.waitActive();
client = NameNodeProxies.createProxy(conf,
cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
// fill up the cluster to be 80% full
long totalCapacity = TestBalancer.sum(capacities);
long totalUsedSpace = totalCapacity * 8 / 10;
final long fileSize = totalUsedSpace / numOfDatanodes;
DFSTestUtil.createFile(cluster.getFileSystem(0), FILE_PATH, false, 1024,
fileSize, DEFAULT_BLOCK_SIZE, (short) numOfDatanodes, 0, false);
// start up an empty node with the same capacity on the same rack as the
// pinned host.
cluster.startDataNodes(conf, 1, true, null, new String[]{newRack},
new String[]{newHost}, new long[]{newCapacity});
if (newUD != null) {
DatanodeID newId = cluster.getDataNodes().get(
numOfDatanodes).getDatanodeId();
dm.getDatanode(newId).setUpgradeDomain(newUD);
}
totalCapacity += newCapacity;
// run balancer and validate results
TestBalancer.waitForHeartBeat(totalUsedSpace,
totalCapacity, client, cluster);
// start rebalancing
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
BlockPlacementPolicy placementPolicy =
cluster.getNamesystem().getBlockManager().getBlockPlacementPolicy();
List<LocatedBlock> locatedBlocks = client.
getBlockLocations(FILE_NAME, 0, fileSize).getLocatedBlocks();
for (LocatedBlock locatedBlock : locatedBlocks) {
BlockPlacementStatus status = placementPolicy.verifyBlockPlacement(
locatedBlock.getLocations(), numOfDatanodes);
assertTrue(status.isPlacementPolicySatisfied());
}
} finally {
cluster.shutdown();
}
}
/**
* Make sure that balancer can't move pinned blocks.
* If specified favoredNodes when create file, blocks will be pinned use
* sticky bit.
*
* @throws Exception
*/
@Test(timeout = 100000)
public void testBalancerWithPinnedBlocks() throws Exception {
// This test assumes stick-bit based block pin mechanism available only
// in Linux/Unix. It can be unblocked on Windows when HDFS-7759 is ready to
// provide a different mechanism for Windows.
assumeNotWindows();
final Configuration conf = new HdfsConfiguration();
initConf(conf);
conf.setBoolean(DFS_DATANODE_BLOCK_PINNING_ENABLED, true);
long[] capacities = new long[]{CAPACITY, CAPACITY};
String[] hosts = {"host0", "host1"};
String[] racks = {RACK0, RACK1};
int numOfDatanodes = capacities.length;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
.hosts(hosts).racks(racks).simulatedCapacities(capacities).build();
cluster.waitActive();
client = NameNodeProxies.createProxy(conf,
cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
// fill up the cluster to be 80% full
long totalCapacity = TestBalancer.sum(capacities);
long totalUsedSpace = totalCapacity * 8 / 10;
InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes];
for (int i = 0; i < favoredNodes.length; i++) {
// DFSClient will attempt reverse lookup. In case it resolves
// "127.0.0.1" to "localhost", we manually specify the hostname.
int port = cluster.getDataNodes().get(i).getXferAddress().getPort();
favoredNodes[i] = new InetSocketAddress(hosts[i], port);
}
DFSTestUtil.createFile(cluster.getFileSystem(0), FILE_PATH, false, 1024,
totalUsedSpace / numOfDatanodes, DEFAULT_BLOCK_SIZE,
(short) numOfDatanodes, 0, false, favoredNodes);
// start up an empty node with the same capacity
cluster.startDataNodes(conf, 1, true, null, new String[]{RACK2},
new long[]{CAPACITY});
totalCapacity += CAPACITY;
// run balancer and validate results
TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,
cluster);
// start rebalancing
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
}
@Test(timeout = 100000)
public void testMaxIterationTime() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
int blockSize = 10 * 1024 * 1024; // 10MB block size
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
// limit the worker thread count of Balancer to have only 1 queue per DN
conf.setInt(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY, 1);
// limit the bandwidth to 4MB per sec to emulate slow block moves
conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
4 * 1024 * 1024);
// set client socket timeout to have an IN_PROGRESS notification back from
// the DataNode about the copy in every second.
conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 2000L);
// set max iteration time to 500 ms to timeout before moving any block
conf.setLong(DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY, 500L);
// setup the cluster
final long capacity = 10L * blockSize;
final long[] dnCapacities = new long[]{capacity, capacity};
final short rep = 1;
final long seed = 0xFAFAFA;
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(0)
.build();
try {
cluster.getConfiguration(0).setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
cluster.startDataNodes(conf, 1, true, null, null, dnCapacities);
cluster.waitClusterUp();
cluster.waitActive();
final Path path = new Path("/testMaxIterationTime.dat");
DistributedFileSystem fs = cluster.getFileSystem();
// fill the DN to 40%
DFSTestUtil.createFile(fs, path, 4L * blockSize, rep, seed);
// start a new DN
cluster.startDataNodes(conf, 1, true, null, null, dnCapacities);
cluster.triggerHeartbeats();
// setup Balancer and run one iteration
List<NameNodeConnector> connectors = Collections.emptyList();
try {
BalancerParameters bParams = BalancerParameters.DEFAULT;
// set maxIdleIterations to 1 for NO_MOVE_PROGRESS to be
// reported when there is no block move
connectors = NameNodeConnector.newNameNodeConnectors(
DFSUtil.getInternalNsRpcUris(conf), Balancer.class.getSimpleName(),
Balancer.BALANCER_ID_PATH, conf, 1);
for (NameNodeConnector nnc : connectors) {
LOG.info("NNC to work on: " + nnc);
Balancer b = new Balancer(nnc, bParams, conf);
Balancer.Result r = b.runOneIteration();
// Since no block can be moved in 500 milli-seconds (i.e.,
// 4MB/s * 0.5s = 2MB < 10MB), NO_MOVE_PROGRESS will be reported.
// When a block move is not canceled in 500 ms properly
// (highly unlikely) and then a block is moved unexpectedly,
// IN_PROGRESS will be reported. This is highly unlikely unexpected
// case. See HDFS-15989.
assertEquals("We expect ExitStatus.NO_MOVE_PROGRESS to be reported.",
ExitStatus.NO_MOVE_PROGRESS, r.getExitStatus());
assertEquals(0, r.getBlocksMoved());
}
} finally {
for (NameNodeConnector nnc : connectors) {
IOUtils.cleanupWithLogger(null, nnc);
}
}
} finally {
cluster.shutdown(true, true);
}
}
}