HDFS-15716. WaitforReplication in TestUpgradeDomainBlockPlacementPolicy (#2528)

(cherry picked from commit 01383a2172)
This commit is contained in:
Ahmed Hussein 2020-12-08 13:25:24 -06:00 committed by Jim Brennan
parent cb2dce30d4
commit e02b179c4c

View File

@ -17,16 +17,13 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -41,16 +38,14 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager; import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager;
import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager; import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.util.HostsFileWriter; import org.apache.hadoop.hdfs.util.HostsFileWriter;
import org.apache.hadoop.net.StaticMapping; import org.apache.hadoop.net.StaticMapping;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.function.Supplier;
/** /**
* End-to-end test case for upgrade domain * End-to-end test case for upgrade domain
* The test configs upgrade domain for nodes via admin json * The test configs upgrade domain for nodes via admin json
@ -63,6 +58,8 @@ public class TestUpgradeDomainBlockPlacementPolicy {
private static final short REPLICATION_FACTOR = (short) 3; private static final short REPLICATION_FACTOR = (short) 3;
private static final int DEFAULT_BLOCK_SIZE = 1024; private static final int DEFAULT_BLOCK_SIZE = 1024;
private static final int WAIT_TIMEOUT_MS = 60000;
private static final long FILE_SIZE = DEFAULT_BLOCK_SIZE * 5;
static final String[] racks = static final String[] racks =
{ "/RACK1", "/RACK1", "/RACK1", "/RACK2", "/RACK2", "/RACK2" }; { "/RACK1", "/RACK1", "/RACK1", "/RACK2", "/RACK2", "/RACK2" };
static final String[] hosts = static final String[] hosts =
@ -71,9 +68,6 @@ public class TestUpgradeDomainBlockPlacementPolicy {
{"ud5", "ud2", "ud3", "ud1", "ud2", "ud4"}; {"ud5", "ud2", "ud3", "ud1", "ud2", "ud4"};
static final Set<DatanodeID> expectedDatanodeIDs = new HashSet<>(); static final Set<DatanodeID> expectedDatanodeIDs = new HashSet<>();
private MiniDFSCluster cluster = null; private MiniDFSCluster cluster = null;
private NamenodeProtocols nameNodeRpc = null;
private FSNamesystem namesystem = null;
private PermissionStatus perm = null;
private HostsFileWriter hostsFileWriter = new HostsFileWriter(); private HostsFileWriter hostsFileWriter = new HostsFileWriter();
@Before @Before
@ -92,10 +86,6 @@ public void setup() throws IOException {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).racks(racks) cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).racks(racks)
.hosts(hosts).build(); .hosts(hosts).build();
cluster.waitActive(); cluster.waitActive();
nameNodeRpc = cluster.getNameNodeRpc();
namesystem = cluster.getNamesystem();
perm = new PermissionStatus("TestDefaultBlockPlacementPolicy", null,
FsPermission.getDefault());
refreshDatanodeAdminProperties(); refreshDatanodeAdminProperties();
} }
@ -186,43 +176,51 @@ private void refreshDatanodeAdminProperties2()
expectedDatanodeIDs.add(cluster.getDataNodes().get(5).getDatanodeId()); expectedDatanodeIDs.add(cluster.getDataNodes().get(5).getDatanodeId());
} }
private void createFileAndWaitForReplication(final Path path,
final long fileLen)
throws Exception {
DFSTestUtil.createFile(cluster.getFileSystem(), path, fileLen,
REPLICATION_FACTOR, 1000L);
DFSTestUtil.waitForReplication(cluster.getFileSystem(), path,
REPLICATION_FACTOR, WAIT_TIMEOUT_MS);
}
@Test @Test
public void testPlacement() throws Exception { public void testPlacement() throws Exception {
final long fileSize = DEFAULT_BLOCK_SIZE * 5; final long fileSize = FILE_SIZE;
final String testFile = new String("/testfile"); final String testFile = "/testfile";
final Path path = new Path(testFile); final Path path = new Path(testFile);
DFSTestUtil.createFile(cluster.getFileSystem(), path, fileSize, createFileAndWaitForReplication(path, FILE_SIZE);
REPLICATION_FACTOR, 1000L);
LocatedBlocks locatedBlocks = LocatedBlocks locatedBlocks =
cluster.getFileSystem().getClient().getLocatedBlocks( cluster.getFileSystem().getClient().getLocatedBlocks(
path.toString(), 0, fileSize); path.toString(), 0, fileSize);
for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) { for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
Set<DatanodeInfo> locs = new HashSet<>(); Set<DatanodeInfo> locs = new HashSet<>();
for(DatanodeInfo datanodeInfo : block.getLocations()) { for(DatanodeInfo datanodeInfo : block.getLocations()) {
if (datanodeInfo.getAdminState() == DatanodeInfo.AdminStates.NORMAL) { if (datanodeInfo.getAdminState()
.equals(DatanodeInfo.AdminStates.NORMAL)) {
locs.add(datanodeInfo); locs.add(datanodeInfo);
} }
} }
for (DatanodeID datanodeID : expectedDatanodeIDs) { for (DatanodeID datanodeID : expectedDatanodeIDs) {
assertTrue(locs.contains(datanodeID)); Assert.assertTrue(locs.contains(datanodeID));
} }
} }
} }
@Test(timeout = 300000) @Test(timeout = 300000)
public void testPlacementAfterDecommission() throws Exception { public void testPlacementAfterDecommission() throws Exception {
final long fileSize = DEFAULT_BLOCK_SIZE * 5; final long fileSize = FILE_SIZE;
final String testFile = new String("/testfile"); final String testFile = "/testfile-afterdecomm";
final Path path = new Path(testFile); final Path path = new Path(testFile);
DFSTestUtil.createFile(cluster.getFileSystem(), path, fileSize, createFileAndWaitForReplication(path, fileSize);
REPLICATION_FACTOR, 1000L);
// Decommission some nodes and wait until decommissions have finished. // Decommission some nodes and wait until decommissions have finished.
refreshDatanodeAdminProperties2(); refreshDatanodeAdminProperties2();
GenericTestUtils.waitFor(new Supplier<Boolean>() { GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override @Override
public Boolean get() { public Boolean get() {
boolean successful = true;
LocatedBlocks locatedBlocks; LocatedBlocks locatedBlocks;
try { try {
locatedBlocks = locatedBlocks =
@ -231,32 +229,34 @@ public Boolean get() {
} catch (IOException ioe) { } catch (IOException ioe) {
return false; return false;
} }
for(LocatedBlock block : locatedBlocks.getLocatedBlocks()) { for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
Set<DatanodeInfo> locs = new HashSet<>(); Set<DatanodeInfo> locs = new HashSet<>();
for (DatanodeInfo datanodeInfo : block.getLocations()) { for (DatanodeInfo datanodeInfo : block.getLocations()) {
if (datanodeInfo.getAdminState() == if (datanodeInfo.getAdminState().equals(
DatanodeInfo.AdminStates.NORMAL) { DatanodeInfo.AdminStates.NORMAL)) {
locs.add(datanodeInfo); locs.add(datanodeInfo);
} }
} }
for (DatanodeID datanodeID : expectedDatanodeIDs) { for (DatanodeID datanodeID : expectedDatanodeIDs) {
successful = successful && locs.contains(datanodeID); if (!locs.contains(datanodeID)) {
return false;
}
} }
} }
return successful; return true;
} }
}, 1000, 60000); }, 1000, WAIT_TIMEOUT_MS);
// Verify block placement policy of each block. // Verify block placement policy of each block.
LocatedBlocks locatedBlocks; LocatedBlocks locatedBlocks =
locatedBlocks =
cluster.getFileSystem().getClient().getLocatedBlocks( cluster.getFileSystem().getClient().getLocatedBlocks(
path.toString(), 0, fileSize); path.toString(), 0, fileSize);
for(LocatedBlock block : locatedBlocks.getLocatedBlocks()) { for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) {
BlockPlacementStatus status = cluster.getNamesystem().getBlockManager(). BlockPlacementStatus status =
getBlockPlacementPolicy().verifyBlockPlacement( cluster.getNamesystem().getBlockManager()
block.getLocations(), REPLICATION_FACTOR); .getBlockPlacementPolicy()
assertTrue(status.isPlacementPolicySatisfied()); .verifyBlockPlacement(block.getLocations(), REPLICATION_FACTOR);
Assert.assertTrue(status.isPlacementPolicySatisfied());
} }
} }
} }