HDFS-1480. All replicas of a block can end up on the same rack when some datanodes are decommissioning. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1160897 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
61cbfa5fde
commit
ec559db68e
@ -989,6 +989,9 @@ Trunk (unreleased changes)
|
|||||||
HDFS-2267. DataXceiver thread name incorrect while waiting on op during
|
HDFS-2267. DataXceiver thread name incorrect while waiting on op during
|
||||||
keepalive. (todd)
|
keepalive. (todd)
|
||||||
|
|
||||||
|
HDFS-1480. All replicas of a block can end up on the same rack when
|
||||||
|
some datanodes are decommissioning. (todd)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-1073 SUBTASKS
|
BREAKDOWN OF HDFS-1073 SUBTASKS
|
||||||
|
|
||||||
HDFS-1521. Persist transaction ID on disk between NN restarts.
|
HDFS-1521. Persist transaction ID on disk between NN restarts.
|
||||||
|
@ -66,6 +66,8 @@
|
|||||||
import org.apache.hadoop.net.Node;
|
import org.apache.hadoop.net.Node;
|
||||||
import org.apache.hadoop.util.Daemon;
|
import org.apache.hadoop.util.Daemon;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Keeps information related to the blocks stored in the Hadoop cluster.
|
* Keeps information related to the blocks stored in the Hadoop cluster.
|
||||||
* This class is a helper class for {@link FSNamesystem} and requires several
|
* This class is a helper class for {@link FSNamesystem} and requires several
|
||||||
@ -147,7 +149,8 @@ public long getExcessBlocksCount() {
|
|||||||
// We also store pending replication-orders.
|
// We also store pending replication-orders.
|
||||||
//
|
//
|
||||||
public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
|
public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
|
||||||
private final PendingReplicationBlocks pendingReplications;
|
@VisibleForTesting
|
||||||
|
final PendingReplicationBlocks pendingReplications;
|
||||||
|
|
||||||
/** The maximum number of replicas allowed for a block */
|
/** The maximum number of replicas allowed for a block */
|
||||||
public final short maxReplication;
|
public final short maxReplication;
|
||||||
@ -312,9 +315,14 @@ public void metaSave(PrintWriter out) {
|
|||||||
for (Block block : neededReplications) {
|
for (Block block : neededReplications) {
|
||||||
List<DatanodeDescriptor> containingNodes =
|
List<DatanodeDescriptor> containingNodes =
|
||||||
new ArrayList<DatanodeDescriptor>();
|
new ArrayList<DatanodeDescriptor>();
|
||||||
|
List<DatanodeDescriptor> containingLiveReplicasNodes =
|
||||||
|
new ArrayList<DatanodeDescriptor>();
|
||||||
|
|
||||||
NumberReplicas numReplicas = new NumberReplicas();
|
NumberReplicas numReplicas = new NumberReplicas();
|
||||||
// source node returned is not used
|
// source node returned is not used
|
||||||
chooseSourceDatanode(block, containingNodes, numReplicas);
|
chooseSourceDatanode(block, containingNodes,
|
||||||
|
containingLiveReplicasNodes, numReplicas);
|
||||||
|
assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas();
|
||||||
int usableReplicas = numReplicas.liveReplicas() +
|
int usableReplicas = numReplicas.liveReplicas() +
|
||||||
numReplicas.decommissionedReplicas();
|
numReplicas.decommissionedReplicas();
|
||||||
|
|
||||||
@ -993,9 +1001,10 @@ private List<List<Block>> chooseUnderReplicatedBlocks(int blocksToProcess) {
|
|||||||
* @param priority a hint of its priority in the neededReplication queue
|
* @param priority a hint of its priority in the neededReplication queue
|
||||||
* @return if the block gets replicated or not
|
* @return if the block gets replicated or not
|
||||||
*/
|
*/
|
||||||
private boolean computeReplicationWorkForBlock(Block block, int priority) {
|
@VisibleForTesting
|
||||||
|
boolean computeReplicationWorkForBlock(Block block, int priority) {
|
||||||
int requiredReplication, numEffectiveReplicas;
|
int requiredReplication, numEffectiveReplicas;
|
||||||
List<DatanodeDescriptor> containingNodes;
|
List<DatanodeDescriptor> containingNodes, liveReplicaNodes;
|
||||||
DatanodeDescriptor srcNode;
|
DatanodeDescriptor srcNode;
|
||||||
INodeFile fileINode = null;
|
INodeFile fileINode = null;
|
||||||
int additionalReplRequired;
|
int additionalReplRequired;
|
||||||
@ -1016,11 +1025,14 @@ private boolean computeReplicationWorkForBlock(Block block, int priority) {
|
|||||||
|
|
||||||
// get a source data-node
|
// get a source data-node
|
||||||
containingNodes = new ArrayList<DatanodeDescriptor>();
|
containingNodes = new ArrayList<DatanodeDescriptor>();
|
||||||
|
liveReplicaNodes = new ArrayList<DatanodeDescriptor>();
|
||||||
NumberReplicas numReplicas = new NumberReplicas();
|
NumberReplicas numReplicas = new NumberReplicas();
|
||||||
srcNode = chooseSourceDatanode(block, containingNodes, numReplicas);
|
srcNode = chooseSourceDatanode(
|
||||||
|
block, containingNodes, liveReplicaNodes, numReplicas);
|
||||||
if(srcNode == null) // block can not be replicated from any node
|
if(srcNode == null) // block can not be replicated from any node
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
|
assert liveReplicaNodes.size() == numReplicas.liveReplicas();
|
||||||
// do not schedule more if enough replicas is already pending
|
// do not schedule more if enough replicas is already pending
|
||||||
numEffectiveReplicas = numReplicas.liveReplicas() +
|
numEffectiveReplicas = numReplicas.liveReplicas() +
|
||||||
pendingReplications.getNumReplicas(block);
|
pendingReplications.getNumReplicas(block);
|
||||||
@ -1047,13 +1059,20 @@ private boolean computeReplicationWorkForBlock(Block block, int priority) {
|
|||||||
} finally {
|
} finally {
|
||||||
namesystem.writeUnlock();
|
namesystem.writeUnlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Exclude all of the containing nodes from being targets.
|
||||||
|
// This list includes decommissioning or corrupt nodes.
|
||||||
|
HashMap<Node, Node> excludedNodes = new HashMap<Node, Node>();
|
||||||
|
for (DatanodeDescriptor dn : containingNodes) {
|
||||||
|
excludedNodes.put(dn, dn);
|
||||||
|
}
|
||||||
|
|
||||||
// choose replication targets: NOT HOLDING THE GLOBAL LOCK
|
// choose replication targets: NOT HOLDING THE GLOBAL LOCK
|
||||||
// It is costly to extract the filename for which chooseTargets is called,
|
// It is costly to extract the filename for which chooseTargets is called,
|
||||||
// so for now we pass in the Inode itself.
|
// so for now we pass in the Inode itself.
|
||||||
DatanodeDescriptor targets[] =
|
DatanodeDescriptor targets[] =
|
||||||
blockplacement.chooseTarget(fileINode, additionalReplRequired,
|
blockplacement.chooseTarget(fileINode, additionalReplRequired,
|
||||||
srcNode, containingNodes, block.getNumBytes());
|
srcNode, liveReplicaNodes, excludedNodes, block.getNumBytes());
|
||||||
if(targets.length == 0)
|
if(targets.length == 0)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
@ -1182,8 +1201,10 @@ public DatanodeDescriptor[] chooseTarget(final String src,
|
|||||||
private DatanodeDescriptor chooseSourceDatanode(
|
private DatanodeDescriptor chooseSourceDatanode(
|
||||||
Block block,
|
Block block,
|
||||||
List<DatanodeDescriptor> containingNodes,
|
List<DatanodeDescriptor> containingNodes,
|
||||||
|
List<DatanodeDescriptor> nodesContainingLiveReplicas,
|
||||||
NumberReplicas numReplicas) {
|
NumberReplicas numReplicas) {
|
||||||
containingNodes.clear();
|
containingNodes.clear();
|
||||||
|
nodesContainingLiveReplicas.clear();
|
||||||
DatanodeDescriptor srcNode = null;
|
DatanodeDescriptor srcNode = null;
|
||||||
int live = 0;
|
int live = 0;
|
||||||
int decommissioned = 0;
|
int decommissioned = 0;
|
||||||
@ -1202,6 +1223,7 @@ else if (node.isDecommissionInProgress() || node.isDecommissioned())
|
|||||||
else if (excessBlocks != null && excessBlocks.contains(block)) {
|
else if (excessBlocks != null && excessBlocks.contains(block)) {
|
||||||
excess++;
|
excess++;
|
||||||
} else {
|
} else {
|
||||||
|
nodesContainingLiveReplicas.add(node);
|
||||||
live++;
|
live++;
|
||||||
}
|
}
|
||||||
containingNodes.add(node);
|
containingNodes.add(node);
|
||||||
@ -2049,7 +2071,8 @@ private long addBlock(Block block, List<BlockWithLocations> results) {
|
|||||||
/**
|
/**
|
||||||
* The given node is reporting that it received a certain block.
|
* The given node is reporting that it received a certain block.
|
||||||
*/
|
*/
|
||||||
private void addBlock(DatanodeDescriptor node, Block block, String delHint)
|
@VisibleForTesting
|
||||||
|
void addBlock(DatanodeDescriptor node, Block block, String delHint)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// decrement number of blocks scheduled to this datanode.
|
// decrement number of blocks scheduled to this datanode.
|
||||||
node.decBlocksScheduled();
|
node.decBlocksScheduled();
|
||||||
|
@ -127,9 +127,10 @@ DatanodeDescriptor[] chooseTarget(FSInodeInfo srcInode,
|
|||||||
int numOfReplicas,
|
int numOfReplicas,
|
||||||
DatanodeDescriptor writer,
|
DatanodeDescriptor writer,
|
||||||
List<DatanodeDescriptor> chosenNodes,
|
List<DatanodeDescriptor> chosenNodes,
|
||||||
|
HashMap<Node, Node> excludedNodes,
|
||||||
long blocksize) {
|
long blocksize) {
|
||||||
return chooseTarget(srcInode.getFullPathName(), numOfReplicas, writer,
|
return chooseTarget(srcInode.getFullPathName(), numOfReplicas, writer,
|
||||||
chosenNodes, blocksize);
|
chosenNodes, excludedNodes, blocksize);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -102,16 +102,6 @@ public DatanodeDescriptor[] chooseTarget(String srcPath,
|
|||||||
excludedNodes, blocksize);
|
excludedNodes, blocksize);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** {@inheritDoc} */
|
|
||||||
@Override
|
|
||||||
public DatanodeDescriptor[] chooseTarget(FSInodeInfo srcInode,
|
|
||||||
int numOfReplicas,
|
|
||||||
DatanodeDescriptor writer,
|
|
||||||
List<DatanodeDescriptor> chosenNodes,
|
|
||||||
long blocksize) {
|
|
||||||
return chooseTarget(numOfReplicas, writer, chosenNodes, false,
|
|
||||||
null, blocksize);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** This is the implementation. */
|
/** This is the implementation. */
|
||||||
DatanodeDescriptor[] chooseTarget(int numOfReplicas,
|
DatanodeDescriptor[] chooseTarget(int numOfReplicas,
|
||||||
|
@ -0,0 +1,391 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
||||||
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.LinkedListMultimap;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
|
public class TestBlockManager {
|
||||||
|
private final List<DatanodeDescriptor> nodes = ImmutableList.of(
|
||||||
|
new DatanodeDescriptor(new DatanodeID("h1:5020"), "/rackA"),
|
||||||
|
new DatanodeDescriptor(new DatanodeID("h2:5020"), "/rackA"),
|
||||||
|
new DatanodeDescriptor(new DatanodeID("h3:5020"), "/rackA"),
|
||||||
|
new DatanodeDescriptor(new DatanodeID("h4:5020"), "/rackB"),
|
||||||
|
new DatanodeDescriptor(new DatanodeID("h5:5020"), "/rackB"),
|
||||||
|
new DatanodeDescriptor(new DatanodeID("h6:5020"), "/rackB")
|
||||||
|
);
|
||||||
|
private final List<DatanodeDescriptor> rackA = nodes.subList(0, 3);
|
||||||
|
private final List<DatanodeDescriptor> rackB = nodes.subList(3, 6);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Some of these tests exercise code which has some randomness involved -
|
||||||
|
* ie even if there's a bug, they may pass because the random node selection
|
||||||
|
* chooses the correct result.
|
||||||
|
*
|
||||||
|
* Since they're true unit tests and run quickly, we loop them a number
|
||||||
|
* of times trying to trigger the incorrect behavior.
|
||||||
|
*/
|
||||||
|
private static final int NUM_TEST_ITERS = 30;
|
||||||
|
|
||||||
|
private static final int BLOCK_SIZE = 64*1024;
|
||||||
|
|
||||||
|
private Configuration conf;
|
||||||
|
private FSNamesystem fsn;
|
||||||
|
private BlockManager bm;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setupMockCluster() throws IOException {
|
||||||
|
conf = new HdfsConfiguration();
|
||||||
|
conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
|
||||||
|
"need to set a dummy value here so it assumes a multi-rack cluster");
|
||||||
|
fsn = Mockito.mock(FSNamesystem.class);
|
||||||
|
Mockito.doReturn(true).when(fsn).hasWriteLock();
|
||||||
|
bm = new BlockManager(fsn, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addNodes(Iterable<DatanodeDescriptor> nodesToAdd) {
|
||||||
|
NetworkTopology cluster = bm.getDatanodeManager().getNetworkTopology();
|
||||||
|
// construct network topology
|
||||||
|
for (DatanodeDescriptor dn : nodesToAdd) {
|
||||||
|
cluster.add(dn);
|
||||||
|
dn.updateHeartbeat(
|
||||||
|
2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||||
|
2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void removeNode(DatanodeDescriptor deadNode) {
|
||||||
|
NetworkTopology cluster = bm.getDatanodeManager().getNetworkTopology();
|
||||||
|
cluster.remove(deadNode);
|
||||||
|
bm.removeBlocksAssociatedTo(deadNode);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that replication of under-replicated blocks is detected
|
||||||
|
* and basically works
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testBasicReplication() throws Exception {
|
||||||
|
addNodes(nodes);
|
||||||
|
for (int i = 0; i < NUM_TEST_ITERS; i++) {
|
||||||
|
doBasicTest(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doBasicTest(int testIndex) {
|
||||||
|
List<DatanodeDescriptor> origNodes = nodes(0, 1);
|
||||||
|
BlockInfo blockInfo = addBlockOnNodes((long)testIndex, origNodes);
|
||||||
|
|
||||||
|
DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
|
||||||
|
assertEquals(2, pipeline.length);
|
||||||
|
assertTrue("Source of replication should be one of the nodes the block " +
|
||||||
|
"was on. Was: " + pipeline[0],
|
||||||
|
origNodes.contains(pipeline[0]));
|
||||||
|
assertTrue("Destination of replication should be on the other rack. " +
|
||||||
|
"Was: " + pipeline[1],
|
||||||
|
rackB.contains(pipeline[1]));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Regression test for HDFS-1480
|
||||||
|
* - Cluster has 2 racks, A and B, each with three nodes.
|
||||||
|
* - Block initially written on A1, A2, B1
|
||||||
|
* - Admin decommissions two of these nodes (let's say A1 and A2 but it doesn't matter)
|
||||||
|
* - Re-replication should respect rack policy
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testTwoOfThreeNodesDecommissioned() throws Exception {
|
||||||
|
addNodes(nodes);
|
||||||
|
for (int i = 0; i < NUM_TEST_ITERS; i++) {
|
||||||
|
doTestTwoOfThreeNodesDecommissioned(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doTestTwoOfThreeNodesDecommissioned(int testIndex) throws Exception {
|
||||||
|
// Block originally on A1, A2, B1
|
||||||
|
List<DatanodeDescriptor> origNodes = nodes(0, 1, 3);
|
||||||
|
BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
|
||||||
|
|
||||||
|
// Decommission two of the nodes (A1, A2)
|
||||||
|
List<DatanodeDescriptor> decomNodes = startDecommission(0, 1);
|
||||||
|
|
||||||
|
DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
|
||||||
|
assertTrue("Source of replication should be one of the nodes the block " +
|
||||||
|
"was on. Was: " + pipeline[0],
|
||||||
|
origNodes.contains(pipeline[0]));
|
||||||
|
assertEquals("Should have two targets", 3, pipeline.length);
|
||||||
|
|
||||||
|
boolean foundOneOnRackA = false;
|
||||||
|
for (int i = 1; i < pipeline.length; i++) {
|
||||||
|
DatanodeDescriptor target = pipeline[i];
|
||||||
|
if (rackA.contains(target)) {
|
||||||
|
foundOneOnRackA = true;
|
||||||
|
}
|
||||||
|
assertFalse(decomNodes.contains(target));
|
||||||
|
assertFalse(origNodes.contains(target));
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue("Should have at least one target on rack A. Pipeline: " +
|
||||||
|
Joiner.on(",").join(pipeline),
|
||||||
|
foundOneOnRackA);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test what happens when a block is on three nodes, and all three of those
|
||||||
|
* nodes are decommissioned. It should properly re-replicate to three new
|
||||||
|
* nodes.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testAllNodesHoldingReplicasDecommissioned() throws Exception {
|
||||||
|
addNodes(nodes);
|
||||||
|
for (int i = 0; i < NUM_TEST_ITERS; i++) {
|
||||||
|
doTestAllNodesHoldingReplicasDecommissioned(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doTestAllNodesHoldingReplicasDecommissioned(int testIndex) throws Exception {
|
||||||
|
// Block originally on A1, A2, B1
|
||||||
|
List<DatanodeDescriptor> origNodes = nodes(0, 1, 3);
|
||||||
|
BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
|
||||||
|
|
||||||
|
// Decommission all of the nodes
|
||||||
|
List<DatanodeDescriptor> decomNodes = startDecommission(0, 1, 3);
|
||||||
|
|
||||||
|
DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
|
||||||
|
assertTrue("Source of replication should be one of the nodes the block " +
|
||||||
|
"was on. Was: " + pipeline[0],
|
||||||
|
origNodes.contains(pipeline[0]));
|
||||||
|
assertEquals("Should have three targets", 4, pipeline.length);
|
||||||
|
|
||||||
|
boolean foundOneOnRackA = false;
|
||||||
|
boolean foundOneOnRackB = false;
|
||||||
|
for (int i = 1; i < pipeline.length; i++) {
|
||||||
|
DatanodeDescriptor target = pipeline[i];
|
||||||
|
if (rackA.contains(target)) {
|
||||||
|
foundOneOnRackA = true;
|
||||||
|
} else if (rackB.contains(target)) {
|
||||||
|
foundOneOnRackB = true;
|
||||||
|
}
|
||||||
|
assertFalse(decomNodes.contains(target));
|
||||||
|
assertFalse(origNodes.contains(target));
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue("Should have at least one target on rack A. Pipeline: " +
|
||||||
|
Joiner.on(",").join(pipeline),
|
||||||
|
foundOneOnRackA);
|
||||||
|
assertTrue("Should have at least one target on rack B. Pipeline: " +
|
||||||
|
Joiner.on(",").join(pipeline),
|
||||||
|
foundOneOnRackB);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test what happens when there are two racks, and an entire rack is
|
||||||
|
* decommissioned.
|
||||||
|
*
|
||||||
|
* Since the cluster is multi-rack, it will consider the block
|
||||||
|
* under-replicated rather than create a third replica on the
|
||||||
|
* same rack. Adding a new node on a third rack should cause re-replication
|
||||||
|
* to that node.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testOneOfTwoRacksDecommissioned() throws Exception {
|
||||||
|
addNodes(nodes);
|
||||||
|
for (int i = 0; i < NUM_TEST_ITERS; i++) {
|
||||||
|
doTestOneOfTwoRacksDecommissioned(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doTestOneOfTwoRacksDecommissioned(int testIndex) throws Exception {
|
||||||
|
// Block originally on A1, A2, B1
|
||||||
|
List<DatanodeDescriptor> origNodes = nodes(0, 1, 3);
|
||||||
|
BlockInfo blockInfo = addBlockOnNodes(testIndex, origNodes);
|
||||||
|
|
||||||
|
// Decommission all of the nodes in rack A
|
||||||
|
List<DatanodeDescriptor> decomNodes = startDecommission(0, 1, 2);
|
||||||
|
|
||||||
|
DatanodeDescriptor[] pipeline = scheduleSingleReplication(blockInfo);
|
||||||
|
assertTrue("Source of replication should be one of the nodes the block " +
|
||||||
|
"was on. Was: " + pipeline[0],
|
||||||
|
origNodes.contains(pipeline[0]));
|
||||||
|
assertEquals("Should have 2 targets", 3, pipeline.length);
|
||||||
|
|
||||||
|
boolean foundOneOnRackB = false;
|
||||||
|
for (int i = 1; i < pipeline.length; i++) {
|
||||||
|
DatanodeDescriptor target = pipeline[i];
|
||||||
|
if (rackB.contains(target)) {
|
||||||
|
foundOneOnRackB = true;
|
||||||
|
}
|
||||||
|
assertFalse(decomNodes.contains(target));
|
||||||
|
assertFalse(origNodes.contains(target));
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue("Should have at least one target on rack B. Pipeline: " +
|
||||||
|
Joiner.on(",").join(pipeline),
|
||||||
|
foundOneOnRackB);
|
||||||
|
|
||||||
|
// Mark the block as received on the target nodes in the pipeline
|
||||||
|
fulfillPipeline(blockInfo, pipeline);
|
||||||
|
|
||||||
|
// the block is still under-replicated. Add a new node. This should allow
|
||||||
|
// the third off-rack replica.
|
||||||
|
DatanodeDescriptor rackCNode = new DatanodeDescriptor(new DatanodeID("h7:5020"), "/rackC");
|
||||||
|
addNodes(ImmutableList.of(rackCNode));
|
||||||
|
try {
|
||||||
|
DatanodeDescriptor[] pipeline2 = scheduleSingleReplication(blockInfo);
|
||||||
|
assertEquals(2, pipeline2.length);
|
||||||
|
assertEquals(rackCNode, pipeline2[1]);
|
||||||
|
} finally {
|
||||||
|
removeNode(rackCNode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit test version of testSufficientlyReplBlocksUsesNewRack from
|
||||||
|
* {@link TestBlocksWithNotEnoughRacks}.
|
||||||
|
**/
|
||||||
|
@Test
|
||||||
|
public void testSufficientlyReplBlocksUsesNewRack() throws Exception {
|
||||||
|
addNodes(nodes);
|
||||||
|
for (int i = 0; i < NUM_TEST_ITERS; i++) {
|
||||||
|
doTestSufficientlyReplBlocksUsesNewRack(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doTestSufficientlyReplBlocksUsesNewRack(int testIndex) {
|
||||||
|
// Originally on only nodes in rack A.
|
||||||
|
List<DatanodeDescriptor> origNodes = rackA;
|
||||||
|
BlockInfo blockInfo = addBlockOnNodes((long)testIndex, origNodes);
|
||||||
|
DatanodeDescriptor pipeline[] = scheduleSingleReplication(blockInfo);
|
||||||
|
|
||||||
|
assertEquals(2, pipeline.length); // single new copy
|
||||||
|
assertTrue("Source of replication should be one of the nodes the block " +
|
||||||
|
"was on. Was: " + pipeline[0],
|
||||||
|
origNodes.contains(pipeline[0]));
|
||||||
|
assertTrue("Destination of replication should be on the other rack. " +
|
||||||
|
"Was: " + pipeline[1],
|
||||||
|
rackB.contains(pipeline[1]));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tell the block manager that replication is completed for the given
|
||||||
|
* pipeline.
|
||||||
|
*/
|
||||||
|
private void fulfillPipeline(BlockInfo blockInfo,
|
||||||
|
DatanodeDescriptor[] pipeline) throws IOException {
|
||||||
|
for (int i = 1; i < pipeline.length; i++) {
|
||||||
|
bm.addBlock(pipeline[i], blockInfo, null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private BlockInfo blockOnNodes(long blkId, List<DatanodeDescriptor> nodes) {
|
||||||
|
Block block = new Block(blkId);
|
||||||
|
BlockInfo blockInfo = new BlockInfo(block, 3);
|
||||||
|
|
||||||
|
for (DatanodeDescriptor dn : nodes) {
|
||||||
|
blockInfo.addNode(dn);
|
||||||
|
}
|
||||||
|
return blockInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<DatanodeDescriptor> nodes(int ... indexes) {
|
||||||
|
List<DatanodeDescriptor> ret = Lists.newArrayList();
|
||||||
|
for (int idx : indexes) {
|
||||||
|
ret.add(nodes.get(idx));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<DatanodeDescriptor> startDecommission(int ... indexes) {
|
||||||
|
List<DatanodeDescriptor> nodes = nodes(indexes);
|
||||||
|
for (DatanodeDescriptor node : nodes) {
|
||||||
|
node.startDecommission();
|
||||||
|
}
|
||||||
|
return nodes;
|
||||||
|
}
|
||||||
|
|
||||||
|
private BlockInfo addBlockOnNodes(long blockId, List<DatanodeDescriptor> nodes) {
|
||||||
|
INodeFile iNode = Mockito.mock(INodeFile.class);
|
||||||
|
Mockito.doReturn((short)3).when(iNode).getReplication();
|
||||||
|
BlockInfo blockInfo = blockOnNodes(blockId, nodes);
|
||||||
|
|
||||||
|
bm.blocksMap.addINode(blockInfo, iNode);
|
||||||
|
return blockInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
private DatanodeDescriptor[] scheduleSingleReplication(Block block) {
|
||||||
|
assertEquals("Block not initially pending replication",
|
||||||
|
0, bm.pendingReplications.getNumReplicas(block));
|
||||||
|
assertTrue("computeReplicationWork should indicate replication is needed",
|
||||||
|
bm.computeReplicationWorkForBlock(block, 1));
|
||||||
|
assertTrue("replication is pending after work is computed",
|
||||||
|
bm.pendingReplications.getNumReplicas(block) > 0);
|
||||||
|
|
||||||
|
LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls =
|
||||||
|
getAllPendingReplications();
|
||||||
|
assertEquals(1, repls.size());
|
||||||
|
Entry<DatanodeDescriptor, BlockTargetPair> repl = repls.entries().iterator().next();
|
||||||
|
DatanodeDescriptor[] targets = repl.getValue().targets;
|
||||||
|
|
||||||
|
DatanodeDescriptor[] pipeline = new DatanodeDescriptor[1 + targets.length];
|
||||||
|
pipeline[0] = repl.getKey();
|
||||||
|
System.arraycopy(targets, 0, pipeline, 1, targets.length);
|
||||||
|
|
||||||
|
return pipeline;
|
||||||
|
}
|
||||||
|
|
||||||
|
private LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> getAllPendingReplications() {
|
||||||
|
LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls =
|
||||||
|
LinkedListMultimap.create();
|
||||||
|
for (DatanodeDescriptor dn : nodes) {
|
||||||
|
List<BlockTargetPair> thisRepls = dn.getReplicationCommand(10);
|
||||||
|
if (thisRepls != null) {
|
||||||
|
repls.putAll(dn, thisRepls);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return repls;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user