diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithUpgradeDomain.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithUpgradeDomain.java index e2e1486dc8..4b3c3cc383 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithUpgradeDomain.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusWithUpgradeDomain.java @@ -60,7 +60,7 @@ public boolean isPlacementPolicySatisfied() { private boolean isUpgradeDomainPolicySatisfied() { if (numberOfReplicas <= upgradeDomainFactor) { - return (numberOfReplicas == upgradeDomains.size()); + return (numberOfReplicas <= upgradeDomains.size()); } else { return upgradeDomains.size() >= upgradeDomainFactor; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementStatusWithUpgradeDomain.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementStatusWithUpgradeDomain.java new file mode 100644 index 0000000000..bfff9328a6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementStatusWithUpgradeDomain.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashSet; +import java.util.Set; + +import org.junit.Before; +import org.junit.Test; + +/** + * Unit tests for BlockPlacementStatusWithUpgradeDomain class. + */ +public class TestBlockPlacementStatusWithUpgradeDomain { + + private Set upgradeDomains; + private BlockPlacementStatusDefault bpsd = + mock(BlockPlacementStatusDefault.class); + + @Before + public void setup() { + upgradeDomains = new HashSet(); + upgradeDomains.add("1"); + upgradeDomains.add("2"); + upgradeDomains.add("3"); + when(bpsd.isPlacementPolicySatisfied()).thenReturn(true); + } + + @Test + public void testIsPolicySatisfiedParentFalse() { + when(bpsd.isPlacementPolicySatisfied()).thenReturn(false); + BlockPlacementStatusWithUpgradeDomain bps = + new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 3, 3); + + // Parent policy is not satisfied but upgrade domain policy is + assertFalse(bps.isPlacementPolicySatisfied()); + } + + @Test + public void testIsPolicySatisfiedAllEqual() { + BlockPlacementStatusWithUpgradeDomain bps = + new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 3, 3); + // Number of domains, replicas and upgradeDomainFactor is equal and parent + // policy is satisfied + assertTrue(bps.isPlacementPolicySatisfied()); + } + + @Test + public void testIsPolicySatisifedSmallDomains() { + // Number of domains is less than replicas but equal to factor + BlockPlacementStatusWithUpgradeDomain bps = + new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 4, 3); + assertTrue(bps.isPlacementPolicySatisfied()); + + // Same as above but replicas is greater than factor + bps = new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 4, 2); + assertTrue(bps.isPlacementPolicySatisfied()); + + // Number of domains is less than replicas and factor + bps = new BlockPlacementStatusWithUpgradeDomain(bpsd, upgradeDomains, 4, 4); + assertFalse(bps.isPlacementPolicySatisfied()); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java index f9a2503b00..8460b6f5ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestUpgradeDomainBlockPlacementPolicy.java @@ -17,38 +17,40 @@ */ package org.apache.hadoop.hdfs.server.namenode; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.util.Arrays; -import java.util.EnumSet; import java.util.HashSet; import java.util.Set; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.DatanodeAdminProperties; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus; import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager; import org.apache.hadoop.hdfs.server.blockmanagement.HostConfigManager; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.util.HostsFileWriter; import org.apache.hadoop.net.StaticMapping; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; +import com.google.common.base.Supplier; + /** * End-to-end test case for upgrade domain * The test configs upgrade domain for nodes via admin json @@ -69,15 +71,16 @@ public class TestUpgradeDomainBlockPlacementPolicy { * CombinedHostFileManager won't allow those hosts. */ static final String[] hosts = - { "127.0.0.1", "127.0.0.1", "127.0.0.1", "127.0.0.1", - "127.0.0.1", "127.0.0.1" }; + {"127.0.0.1", "127.0.0.1", "127.0.0.1", "127.0.0.1", + "127.0.0.1", "127.0.0.1"}; static final String[] upgradeDomains = - { "ud1", "ud2", "ud3", "ud1", "ud2", "ud3" }; + {"ud5", "ud2", "ud3", "ud1", "ud2", "ud4"}; static final Set expectedDatanodeIDs = new HashSet<>(); private MiniDFSCluster cluster = null; private NamenodeProtocols nameNodeRpc = null; private FSNamesystem namesystem = null; private PermissionStatus perm = null; + private HostsFileWriter hostsFileWriter = new HostsFileWriter(); @Before public void setup() throws IOException { @@ -86,11 +89,10 @@ public void setup() throws IOException { conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE / 2); conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, - BlockPlacementPolicyWithUpgradeDomain.class, - BlockPlacementPolicy.class); + BlockPlacementPolicyWithUpgradeDomain.class, + BlockPlacementPolicy.class); conf.setClass(DFSConfigKeys.DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY, - CombinedHostFileManager.class, HostConfigManager.class); - HostsFileWriter hostsFileWriter = new HostsFileWriter(); + CombinedHostFileManager.class, HostConfigManager.class); hostsFileWriter.initialize(conf, "temp/upgradedomainpolicy"); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).racks(racks) @@ -100,12 +102,12 @@ public void setup() throws IOException { namesystem = cluster.getNamesystem(); perm = new PermissionStatus("TestDefaultBlockPlacementPolicy", null, FsPermission.getDefault()); - refreshDatanodeAdminProperties(hostsFileWriter); - hostsFileWriter.cleanup(); + refreshDatanodeAdminProperties(); } @After - public void teardown() { + public void teardown() throws IOException { + hostsFileWriter.cleanup(); if (cluster != null) { cluster.shutdown(); cluster = null; @@ -114,15 +116,18 @@ public void teardown() { /** * Define admin properties for these datanodes as follows. - * dn0 and dn3 have upgrade domain ud1. - * dn1 and dn4 have upgrade domain ud2. - * dn2 and dn5 have upgrade domain ud3. + * dn0's upgrade domain is ud5. + * dn1's upgrade domain is ud2. + * dn2's upgrade domain is ud3. + * dn3's upgrade domain is ud1. + * dn4's upgrade domain is ud2. + * dn5's upgrade domain is ud4. * dn0 and dn5 are decommissioned. * Given dn0, dn1 and dn2 are on rack1 and dn3, dn4 and dn5 are on * rack2. Then any block's replicas should be on either * {dn1, dn2, d3} or {dn2, dn3, dn4}. */ - private void refreshDatanodeAdminProperties(HostsFileWriter hostsFileWriter) + private void refreshDatanodeAdminProperties() throws IOException { DatanodeAdminProperties[] datanodes = new DatanodeAdminProperties[ hosts.length]; @@ -138,32 +143,116 @@ private void refreshDatanodeAdminProperties(HostsFileWriter hostsFileWriter) hostsFileWriter.initIncludeHosts(datanodes); cluster.getFileSystem().refreshNodes(); + expectedDatanodeIDs.clear(); expectedDatanodeIDs.add(cluster.getDataNodes().get(2).getDatanodeId()); expectedDatanodeIDs.add(cluster.getDataNodes().get(3).getDatanodeId()); } + /** + * Define admin properties for these datanodes as follows. + * dn0's upgrade domain is ud5. + * dn1's upgrade domain is ud2. + * dn2's upgrade domain is ud3. + * dn3's upgrade domain is ud1. + * dn4's upgrade domain is ud2. + * dn5's upgrade domain is ud4. + * dn2 and dn3 are decommissioned. + * Given dn0, dn1 and dn2 are on rack1 and dn3, dn4 and dn5 are on + * rack2. Then any block's replicas should be on either + * {dn0, dn1, d5} or {dn0, dn4, dn5}. + */ + private void refreshDatanodeAdminProperties2() + throws IOException { + DatanodeAdminProperties[] datanodes = new DatanodeAdminProperties[ + hosts.length]; + for (int i = 0; i < hosts.length; i++) { + datanodes[i] = new DatanodeAdminProperties(); + DatanodeID datanodeID = cluster.getDataNodes().get(i).getDatanodeId(); + datanodes[i].setHostName(datanodeID.getHostName()); + datanodes[i].setPort(datanodeID.getXferPort()); + datanodes[i].setUpgradeDomain(upgradeDomains[i]); + } + datanodes[2].setAdminState(DatanodeInfo.AdminStates.DECOMMISSIONED); + datanodes[3].setAdminState(DatanodeInfo.AdminStates.DECOMMISSIONED); + hostsFileWriter.initIncludeHosts(datanodes); + cluster.getFileSystem().refreshNodes(); + + expectedDatanodeIDs.clear(); + expectedDatanodeIDs.add(cluster.getDataNodes().get(0).getDatanodeId()); + expectedDatanodeIDs.add(cluster.getDataNodes().get(5).getDatanodeId()); + } + @Test public void testPlacement() throws Exception { - String clientMachine = "127.0.0.1"; - for (int i = 0; i < 5; i++) { - String src = "/test-" + i; - // Create the file with client machine - HdfsFileStatus fileStatus = namesystem.startFile(src, perm, - clientMachine, clientMachine, EnumSet.of(CreateFlag.CREATE), true, - REPLICATION_FACTOR, DEFAULT_BLOCK_SIZE, null, false); - LocatedBlock locatedBlock = nameNodeRpc.addBlock(src, clientMachine, - null, null, fileStatus.getFileId(), null, null); - - assertEquals("Block should be allocated sufficient locations", - REPLICATION_FACTOR, locatedBlock.getLocations().length); - Set locs = new HashSet<>(Arrays.asList( - locatedBlock.getLocations())); - for (DatanodeID datanodeID : expectedDatanodeIDs) { - locs.contains(datanodeID); + final long fileSize = DEFAULT_BLOCK_SIZE * 5; + final String testFile = new String("/testfile"); + final Path path = new Path(testFile); + DFSTestUtil.createFile(cluster.getFileSystem(), path, fileSize, + REPLICATION_FACTOR, 1000L); + LocatedBlocks locatedBlocks = + cluster.getFileSystem().getClient().getLocatedBlocks( + path.toString(), 0, fileSize); + for (LocatedBlock block : locatedBlocks.getLocatedBlocks()) { + Set locs = new HashSet<>(); + for(DatanodeInfo datanodeInfo : block.getLocations()) { + if (datanodeInfo.getAdminState() == DatanodeInfo.AdminStates.NORMAL) { + locs.add(datanodeInfo); + } } + for (DatanodeID datanodeID : expectedDatanodeIDs) { + assertTrue(locs.contains(datanodeID)); + } + } + } - nameNodeRpc.abandonBlock(locatedBlock.getBlock(), fileStatus.getFileId(), - src, clientMachine); + @Test(timeout = 300000) + public void testPlacementAfterDecommission() throws Exception { + final long fileSize = DEFAULT_BLOCK_SIZE * 5; + final String testFile = new String("/testfile"); + final Path path = new Path(testFile); + DFSTestUtil.createFile(cluster.getFileSystem(), path, fileSize, + REPLICATION_FACTOR, 1000L); + + // Decommission some nodes and wait until decommissions have finished. + refreshDatanodeAdminProperties2(); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + boolean successful = true; + LocatedBlocks locatedBlocks; + try { + locatedBlocks = + cluster.getFileSystem().getClient().getLocatedBlocks( + path.toString(), 0, fileSize); + } catch (IOException ioe) { + return false; + } + for(LocatedBlock block : locatedBlocks.getLocatedBlocks()) { + Set locs = new HashSet<>(); + for (DatanodeInfo datanodeInfo : block.getLocations()) { + if (datanodeInfo.getAdminState() == + DatanodeInfo.AdminStates.NORMAL) { + locs.add(datanodeInfo); + } + } + for (DatanodeID datanodeID : expectedDatanodeIDs) { + successful = successful && locs.contains(datanodeID); + } + } + return successful; + } + }, 1000, 60000); + + // Verify block placement policy of each block. + LocatedBlocks locatedBlocks; + locatedBlocks = + cluster.getFileSystem().getClient().getLocatedBlocks( + path.toString(), 0, fileSize); + for(LocatedBlock block : locatedBlocks.getLocatedBlocks()) { + BlockPlacementStatus status = cluster.getNamesystem().getBlockManager(). + getBlockPlacementPolicy().verifyBlockPlacement( + block.getLocations(), REPLICATION_FACTOR); + assertTrue(status.isPlacementPolicySatisfied()); } } }