From 644c2f6924f341f51d809c91dccfff88fc82f6f0 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Thu, 5 Oct 2017 16:58:43 -0700 Subject: [PATCH] HDFS-12567. BlockPlacementPolicyRackFaultTolerant fails with racks with very few nodes. --- .../blockmanagement/BlockPlacementPolicy.java | 2 +- ...BlockPlacementPolicyRackFaultTolerant.java | 49 ++++++-- .../hdfs/TestErasureCodingMultipleRacks.java | 107 ++++++++++++++++++ 3 files changed, 146 insertions(+), 12 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingMultipleRacks.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java index 732a2dc7b0..23e3e40486 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java @@ -43,7 +43,7 @@ */ @InterfaceAudience.Private public abstract class BlockPlacementPolicy { - static final Logger LOG = LoggerFactory.getLogger( + public static final Logger LOG = LoggerFactory.getLogger( BlockPlacementPolicy.class); @InterfaceAudience.Private diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java index c0d981c63d..1eac3eaf9c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java @@ -46,9 +46,12 @@ protected int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) { if (numOfRacks == 1 || totalNumOfReplicas <= 1) { return new int[] {numOfReplicas, totalNumOfReplicas}; } - if(totalNumOfReplicas(excludedNodes), - blocksize, maxNodesPerRack -1, results, avoidStaleNodes, storageTypes); + try { + // Try to spread the replicas as evenly as possible across racks. + // This is done by first placing with (maxNodesPerRack-1), then spreading + // the remainder by calling again with maxNodesPerRack. + writer = chooseOnce(numOfReplicas, writer, new HashSet<>(excludedNodes), + blocksize, maxNodesPerRack - 1, results, avoidStaleNodes, + storageTypes); - for (DatanodeStorageInfo resultStorage : results) { - addToExcludedNodes(resultStorage.getDatanodeDescriptor(), excludedNodes); + // Exclude the chosen nodes + for (DatanodeStorageInfo resultStorage : results) { + addToExcludedNodes(resultStorage.getDatanodeDescriptor(), + excludedNodes); + } + LOG.trace("Chosen nodes: {}", results); + LOG.trace("Excluded nodes: {}", excludedNodes); + + numOfReplicas = totalReplicaExpected - results.size(); + chooseOnce(numOfReplicas, writer, excludedNodes, blocksize, + maxNodesPerRack, results, avoidStaleNodes, storageTypes); + } catch (NotEnoughReplicasException e) { + LOG.debug("Only able to place {} of {} (maxNodesPerRack={}) nodes " + + "evenly across racks, falling back to uneven placement.", + results.size(), numOfReplicas, maxNodesPerRack); + LOG.debug("Caught exception was:", e); + // Exclude the chosen nodes + for (DatanodeStorageInfo resultStorage : results) { + addToExcludedNodes(resultStorage.getDatanodeDescriptor(), + excludedNodes); + } + + LOG.trace("Chosen nodes: {}", results); + LOG.trace("Excluded nodes: {}", excludedNodes); + numOfReplicas = totalReplicaExpected - results.size(); + chooseOnce(numOfReplicas, writer, excludedNodes, blocksize, + totalReplicaExpected, results, avoidStaleNodes, storageTypes); } - // For some racks, place one more replica to each one of them. - numOfReplicas = totalReplicaExpected - results.size(); - chooseOnce(numOfReplicas, writer, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageTypes); - return writer; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingMultipleRacks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingMultipleRacks.java new file mode 100644 index 0000000000..eb6213af14 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingMultipleRacks.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +/** + * Test erasure coding block placement with skewed # nodes per rack. + */ +public class TestErasureCodingMultipleRacks { + public static final Logger LOG = + LoggerFactory.getLogger(TestErasureCodingMultipleRacks.class); + + static { + GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.DEBUG); + GenericTestUtils.setLogLevel(BlockPlacementPolicyDefault.LOG, Level.DEBUG); + GenericTestUtils.setLogLevel(BlockPlacementPolicyRackFaultTolerant.LOG, + Level.DEBUG); + GenericTestUtils.setLogLevel(NetworkTopology.LOG, Level.DEBUG); + } + + @Rule + public Timeout globalTimeout = new Timeout(300000); + + public ErasureCodingPolicy getPolicy() { + return StripedFileTestUtil.getDefaultECPolicy(); + } + + private MiniDFSCluster cluster; + private ErasureCodingPolicy ecPolicy; + private Configuration conf; + private DistributedFileSystem dfs; + + @Before + public void setup() throws Exception { + ecPolicy = getPolicy(); + final int dataUnits = ecPolicy.getNumDataUnits(); + final int parityUnits = ecPolicy.getNumParityUnits(); + final int numDatanodes = dataUnits + parityUnits; + final int numRacks = 2; + final String[] racks = new String[numDatanodes]; + for (int i = 0; i < numRacks; i++) { + racks[i] = "/rack" + i; + } + for (int i = numRacks; i < numDatanodes; i++) { + racks[i] = "/rack" + (numRacks - 1); + } + conf = new HdfsConfiguration(); + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numDatanodes) + .racks(racks) + .build(); + dfs = cluster.getFileSystem(); + cluster.waitActive(); + dfs.setErasureCodingPolicy(new Path("/"), ecPolicy.getName()); + } + + @After + public void teardown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testSkewedRack() throws Exception { + final int filesize = ecPolicy.getNumDataUnits() * ecPolicy + .getCellSize(); + byte[] contents = new byte[filesize]; + + for (int i = 0; i < 10; i++) { + final Path path = new Path("/testfile" + i); + LOG.info("Writing file " + path); + DFSTestUtil.writeFile(dfs, path, contents); + } + } +}