From 4812518b23cac496ab5cdad5258773bcd9728770 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Thu, 16 Mar 2017 15:07:38 -0700 Subject: [PATCH] HDFS-10530. BlockManager reconstruction work scheduling should correctly adhere to EC block placement policy. Contributed by Manoj Govindassamy and Rui Gao. --- .../server/blockmanagement/BlockManager.java | 2 +- .../hdfs/server/balancer/TestBalancer.java | 11 +- .../blockmanagement/TestBlockManager.java | 113 +++++++++++++++++- 3 files changed, 119 insertions(+), 7 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 5dc40fa10e..be30e787cb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -4179,7 +4179,7 @@ boolean isPlacementPolicySatisfied(BlockInfo storedBlock) { BlockPlacementPolicy placementPolicy = placementPolicies .getPolicy(blockType); int numReplicas = blockType == STRIPED ? ((BlockInfoStriped) storedBlock) - .getRealDataBlockNum() : storedBlock.getReplication(); + .getRealTotalBlockNum() : storedBlock.getReplication(); return placementPolicy.verifyBlockPlacement(locs, numReplicas) .isPlacementPolicySatisfied(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index d2e8f4485e..30a3a327d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -1918,7 +1918,7 @@ public void testBalancerWithStripedFile() throws Exception { } private void doTestBalancerWithStripedFile(Configuration conf) throws Exception { - int numOfDatanodes = dataBlocks + parityBlocks + 2; + int numOfDatanodes = dataBlocks + parityBlocks + 3; int numOfRacks = dataBlocks; long capacity = 20 * defaultBlockSize; long[] capacities = new long[numOfDatanodes]; @@ -1956,11 +1956,12 @@ private void doTestBalancerWithStripedFile(Configuration conf) throws Exception LocatedBlocks locatedBlocks = client.getBlockLocations(fileName, 0, fileLen); StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize); - // add one datanode + // add datanodes in new rack String newRack = "/rack" + (++numOfRacks); - cluster.startDataNodes(conf, 1, true, null, - new String[]{newRack}, null, new long[]{capacity}); - totalCapacity += capacity; + cluster.startDataNodes(conf, 2, true, null, + new String[]{newRack, newRack}, null, + new long[]{capacity, capacity}); + totalCapacity += capacity*2; cluster.triggerHeartbeats(); // run balancer and validate results diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 00bea1c388..36dafa5f78 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -40,6 +40,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; @@ -55,6 +56,8 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; @@ -68,6 +71,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -125,13 +129,14 @@ public class TestBlockManager { * of times trying to trigger the incorrect behavior. */ private static final int NUM_TEST_ITERS = 30; - private static final int BLOCK_SIZE = 64*1024; + private static final Log LOG = LogFactory.getLog(TestBlockManager.class); private FSNamesystem fsn; private BlockManager bm; private long mockINodeId; + @Before public void setupMockCluster() throws IOException { Configuration conf = new HdfsConfiguration(); @@ -1287,4 +1292,110 @@ public void testIsReplicaCorruptCall() throws Exception { isReplicaCorrupt(Mockito.any(BlockInfo.class), Mockito.any(DatanodeDescriptor.class)); } + + @Test (timeout = 300000) + public void testPlacementPolicySatisfied() throws Exception { + LOG.info("Starting testPlacementPolicySatisfied."); + final String[] initialRacks = new String[]{ + "/rack0", "/rack1", "/rack2", "/rack3", "/rack4", "/rack5"}; + final String[] initialHosts = new String[]{ + "host0", "host1", "host2", "host3", "host4", "host5"}; + final int numDataBlocks = StripedFileTestUtil.getDefaultECPolicy() + .getNumDataUnits(); + final int numParityBlocks = StripedFileTestUtil.getDefaultECPolicy() + .getNumParityUnits(); + final long blockSize = 64 * 1024; + Configuration conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1); + conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY, + StripedFileTestUtil.getDefaultECPolicy().getName()); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf) + .racks(initialRacks) + .hosts(initialHosts) + .numDataNodes(initialRacks.length) + .build(); + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + final Path ecDir = new Path("/ec"); + final Path testFileUnsatisfied = new Path(ecDir, "test1"); + final Path testFileSatisfied = new Path(ecDir, "test2"); + cluster.getFileSystem().getClient().mkdirs(ecDir.toString(), null, true); + cluster.getFileSystem().getClient() + .setErasureCodingPolicy(ecDir.toString(), + StripedFileTestUtil.getDefaultECPolicy().getName()); + long fileLen = blockSize * numDataBlocks; + + // Create a file to be stored in 6 racks. + DFSTestUtil.createFile(dfs, testFileUnsatisfied, fileLen, (short) 1, 1); + // Block placement policy should be satisfied as rack count + // is less than numDataBlocks + numParityBlocks. + verifyPlacementPolicy(cluster, testFileUnsatisfied, true); + + LOG.info("Adding 3 new hosts in the existing racks."); + cluster.startDataNodes(conf, 3, true, null, + new String[]{"/rack3", "/rack4", "/rack5"}, + new String[]{"host3-2", "host4-2", "host5-2"}, null); + cluster.triggerHeartbeats(); + + LOG.info("Waiting for EC reconstruction to complete."); + DFSTestUtil.waitForReplication(dfs, testFileUnsatisfied, + (short)(numDataBlocks + numParityBlocks), 30 * 1000); + // Block placement policy should still be satisfied + // as there are only 6 racks. + verifyPlacementPolicy(cluster, testFileUnsatisfied, true); + + LOG.info("Adding 3 new hosts in 3 new racks."); + cluster.startDataNodes(conf, 3, true, null, + new String[]{"/rack6", "/rack7", "/rack8"}, + new String[]{"host6", "host7", "host8"}, + null); + cluster.triggerHeartbeats(); + // Addition of new racks can make the existing EC files block + // placements unsatisfied and there is NO automatic block + // reconstruction for this yet. + // TODO: + // Verify for block placement satisfied once the automatic + // block reconstruction is implemented. + verifyPlacementPolicy(cluster, testFileUnsatisfied, false); + + // Create a new file + DFSTestUtil.createFile(dfs, testFileSatisfied, fileLen, (short) 1, 1); + // The new file should be rightly placed on all 9 racks + // and the block placement policy should be satisfied. + verifyPlacementPolicy(cluster, testFileUnsatisfied, false); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + private void verifyPlacementPolicy(final MiniDFSCluster cluster, + final Path file, boolean isBlockPlacementSatisfied) throws IOException { + DistributedFileSystem dfs = cluster.getFileSystem(); + BlockManager blockManager = cluster.getNamesystem().getBlockManager(); + LocatedBlock lb = DFSTestUtil.getAllBlocks(dfs, file).get(0); + BlockInfo blockInfo = + blockManager.getStoredBlock(lb.getBlock().getLocalBlock()); + Iterator itr = blockInfo.getStorageInfos(); + LOG.info("Block " + blockInfo + " storages: "); + while (itr.hasNext()) { + DatanodeStorageInfo dn = itr.next(); + LOG.info(" Rack: " + dn.getDatanodeDescriptor().getNetworkLocation() + + ", DataNode: " + dn.getDatanodeDescriptor().getXferAddr()); + } + if (isBlockPlacementSatisfied) { + assertTrue("Block group of " + file + "should be placement" + + " policy satisfied, currently!", + blockManager.isPlacementPolicySatisfied(blockInfo)); + } else { + assertFalse("Block group of " + file + " should be placement" + + " policy unsatisfied, currently!", + blockManager.isPlacementPolicySatisfied(blockInfo)); + } + } + }