HDFS-16303. Improve handling of datanode lost while decommissioning (#3675)

Co-authored-by: Kevin Wikant <wikak@amazon.com>
Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
This commit is contained in:
KevinWikant 2021-12-23 08:59:08 -05:00 committed by GitHub
parent c8725de547
commit d20b598f97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 438 additions and 18 deletions

View File

@ -34,6 +34,7 @@
import java.util.LinkedList; import java.util.LinkedList;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Queue; import java.util.Queue;
import java.util.stream.Collectors;
/** /**
* This class implements the logic to track decommissioning and entering * This class implements the logic to track decommissioning and entering
@ -149,7 +150,7 @@ protected void processConf() {
*/ */
@Override @Override
public void stopTrackingNode(DatanodeDescriptor dn) { public void stopTrackingNode(DatanodeDescriptor dn) {
pendingNodes.remove(dn); getPendingNodes().remove(dn);
cancelledNodes.add(dn); cancelledNodes.add(dn);
} }
@ -189,6 +190,29 @@ public void run() {
* node will be removed from tracking by the pending cancel. * node will be removed from tracking by the pending cancel.
*/ */
processCancelledNodes(); processCancelledNodes();
// Having more nodes decommissioning than can be tracked will impact decommissioning
// performance due to queueing delay
int numTrackedNodes = outOfServiceNodeBlocks.size();
int numQueuedNodes = getPendingNodes().size();
int numDecommissioningNodes = numTrackedNodes + numQueuedNodes;
if (numDecommissioningNodes > maxConcurrentTrackedNodes) {
LOG.warn(
"{} nodes are decommissioning but only {} nodes will be tracked at a time. "
+ "{} nodes are currently queued waiting to be decommissioned.",
numDecommissioningNodes, maxConcurrentTrackedNodes, numQueuedNodes);
// Re-queue unhealthy nodes to make space for decommissioning healthy nodes
final List<DatanodeDescriptor> unhealthyDns = outOfServiceNodeBlocks.keySet().stream()
.filter(dn -> !blockManager.isNodeHealthyForDecommissionOrMaintenance(dn))
.collect(Collectors.toList());
getUnhealthyNodesToRequeue(unhealthyDns, numDecommissioningNodes).forEach(dn -> {
getPendingNodes().add(dn);
outOfServiceNodeBlocks.remove(dn);
pendingRep.remove(dn);
});
}
processPendingNodes(); processPendingNodes();
} finally { } finally {
namesystem.writeUnlock(); namesystem.writeUnlock();
@ -207,7 +231,7 @@ public void run() {
LOG.info("Checked {} blocks this tick. {} nodes are now " + LOG.info("Checked {} blocks this tick. {} nodes are now " +
"in maintenance or transitioning state. {} nodes pending. {} " + "in maintenance or transitioning state. {} nodes pending. {} " +
"nodes waiting to be cancelled.", "nodes waiting to be cancelled.",
numBlocksChecked, outOfServiceNodeBlocks.size(), pendingNodes.size(), numBlocksChecked, outOfServiceNodeBlocks.size(), getPendingNodes().size(),
cancelledNodes.size()); cancelledNodes.size());
} }
} }
@ -220,10 +244,10 @@ public void run() {
* the pendingNodes list from being modified externally. * the pendingNodes list from being modified externally.
*/ */
private void processPendingNodes() { private void processPendingNodes() {
while (!pendingNodes.isEmpty() && while (!getPendingNodes().isEmpty() &&
(maxConcurrentTrackedNodes == 0 || (maxConcurrentTrackedNodes == 0 ||
outOfServiceNodeBlocks.size() < maxConcurrentTrackedNodes)) { outOfServiceNodeBlocks.size() < maxConcurrentTrackedNodes)) {
outOfServiceNodeBlocks.put(pendingNodes.poll(), null); outOfServiceNodeBlocks.put(getPendingNodes().poll(), null);
} }
} }

View File

@ -123,7 +123,7 @@ private boolean exceededNumBlocksPerCheck() {
@Override @Override
public void stopTrackingNode(DatanodeDescriptor dn) { public void stopTrackingNode(DatanodeDescriptor dn) {
pendingNodes.remove(dn); getPendingNodes().remove(dn);
outOfServiceNodeBlocks.remove(dn); outOfServiceNodeBlocks.remove(dn);
} }
@ -164,19 +164,19 @@ public void run() {
LOG.info("Checked {} blocks and {} nodes this tick. {} nodes are now " + LOG.info("Checked {} blocks and {} nodes this tick. {} nodes are now " +
"in maintenance or transitioning state. {} nodes pending.", "in maintenance or transitioning state. {} nodes pending.",
numBlocksChecked, numNodesChecked, outOfServiceNodeBlocks.size(), numBlocksChecked, numNodesChecked, outOfServiceNodeBlocks.size(),
pendingNodes.size()); getPendingNodes().size());
} }
} }
/** /**
* Pop datanodes off the pending list and into decomNodeBlocks, * Pop datanodes off the pending priority queue and into decomNodeBlocks,
* subject to the maxConcurrentTrackedNodes limit. * subject to the maxConcurrentTrackedNodes limit.
*/ */
private void processPendingNodes() { private void processPendingNodes() {
while (!pendingNodes.isEmpty() && while (!getPendingNodes().isEmpty() &&
(maxConcurrentTrackedNodes == 0 || (maxConcurrentTrackedNodes == 0 ||
outOfServiceNodeBlocks.size() < maxConcurrentTrackedNodes)) { outOfServiceNodeBlocks.size() < maxConcurrentTrackedNodes)) {
outOfServiceNodeBlocks.put(pendingNodes.poll(), null); outOfServiceNodeBlocks.put(getPendingNodes().poll(), null);
} }
} }
@ -185,6 +185,7 @@ private void check() {
it = new CyclicIteration<>(outOfServiceNodeBlocks, it = new CyclicIteration<>(outOfServiceNodeBlocks,
iterkey).iterator(); iterkey).iterator();
final List<DatanodeDescriptor> toRemove = new ArrayList<>(); final List<DatanodeDescriptor> toRemove = new ArrayList<>();
final List<DatanodeDescriptor> unhealthyDns = new ArrayList<>();
while (it.hasNext() && !exceededNumBlocksPerCheck() && namesystem while (it.hasNext() && !exceededNumBlocksPerCheck() && namesystem
.isRunning()) { .isRunning()) {
@ -221,6 +222,10 @@ private void check() {
LOG.debug("Processing {} node {}", dn.getAdminState(), dn); LOG.debug("Processing {} node {}", dn.getAdminState(), dn);
pruneReliableBlocks(dn, blocks); pruneReliableBlocks(dn, blocks);
} }
final boolean isHealthy = blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
if (!isHealthy) {
unhealthyDns.add(dn);
}
if (blocks.size() == 0) { if (blocks.size() == 0) {
if (!fullScan) { if (!fullScan) {
// If we didn't just do a full scan, need to re-check with the // If we didn't just do a full scan, need to re-check with the
@ -236,8 +241,6 @@ private void check() {
} }
// If the full scan is clean AND the node liveness is okay, // If the full scan is clean AND the node liveness is okay,
// we can finally mark as DECOMMISSIONED or IN_MAINTENANCE. // we can finally mark as DECOMMISSIONED or IN_MAINTENANCE.
final boolean isHealthy =
blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
if (blocks.size() == 0 && isHealthy) { if (blocks.size() == 0 && isHealthy) {
if (dn.isDecommissionInProgress()) { if (dn.isDecommissionInProgress()) {
dnAdmin.setDecommissioned(dn); dnAdmin.setDecommissioned(dn);
@ -270,12 +273,31 @@ private void check() {
// an invalid state. // an invalid state.
LOG.warn("DatanodeAdminMonitor caught exception when processing node " LOG.warn("DatanodeAdminMonitor caught exception when processing node "
+ "{}.", dn, e); + "{}.", dn, e);
pendingNodes.add(dn); getPendingNodes().add(dn);
toRemove.add(dn); toRemove.add(dn);
} finally { } finally {
iterkey = dn; iterkey = dn;
} }
} }
// Having more nodes decommissioning than can be tracked will impact decommissioning
// performance due to queueing delay
int numTrackedNodes = outOfServiceNodeBlocks.size() - toRemove.size();
int numQueuedNodes = getPendingNodes().size();
int numDecommissioningNodes = numTrackedNodes + numQueuedNodes;
if (numDecommissioningNodes > maxConcurrentTrackedNodes) {
LOG.warn(
"{} nodes are decommissioning but only {} nodes will be tracked at a time. "
+ "{} nodes are currently queued waiting to be decommissioned.",
numDecommissioningNodes, maxConcurrentTrackedNodes, numQueuedNodes);
// Re-queue unhealthy nodes to make space for decommissioning healthy nodes
getUnhealthyNodesToRequeue(unhealthyDns, numDecommissioningNodes).forEach(dn -> {
getPendingNodes().add(dn);
outOfServiceNodeBlocks.remove(dn);
});
}
// Remove the datanodes that are DECOMMISSIONED or in service after // Remove the datanodes that are DECOMMISSIONED or in service after
// maintenance expiration. // maintenance expiration.
for (DatanodeDescriptor dn : toRemove) { for (DatanodeDescriptor dn : toRemove) {

View File

@ -24,8 +24,11 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayDeque; import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue; import java.util.Queue;
import java.util.stream.Stream;
/** /**
* This abstract class provides some base methods which are inherited by * This abstract class provides some base methods which are inherited by
@ -35,12 +38,20 @@
public abstract class DatanodeAdminMonitorBase public abstract class DatanodeAdminMonitorBase
implements DatanodeAdminMonitorInterface, Configurable { implements DatanodeAdminMonitorInterface, Configurable {
/**
* Sort by lastUpdate time descending order, such that unhealthy
* nodes are de-prioritized given they cannot be decommissioned.
*/
static final Comparator<DatanodeDescriptor> PENDING_NODES_QUEUE_COMPARATOR =
(dn1, dn2) -> Long.compare(dn2.getLastUpdate(), dn1.getLastUpdate());
protected BlockManager blockManager; protected BlockManager blockManager;
protected Namesystem namesystem; protected Namesystem namesystem;
protected DatanodeAdminManager dnAdmin; protected DatanodeAdminManager dnAdmin;
protected Configuration conf; protected Configuration conf;
protected final Queue<DatanodeDescriptor> pendingNodes = new ArrayDeque<>(); private final PriorityQueue<DatanodeDescriptor> pendingNodes = new PriorityQueue<>(
PENDING_NODES_QUEUE_COMPARATOR);
/** /**
* The maximum number of nodes to track in outOfServiceNodeBlocks. * The maximum number of nodes to track in outOfServiceNodeBlocks.
@ -151,4 +162,34 @@ public int getPendingNodeCount() {
public Queue<DatanodeDescriptor> getPendingNodes() { public Queue<DatanodeDescriptor> getPendingNodes() {
return pendingNodes; return pendingNodes;
} }
/**
* If node "is dead while in Decommission In Progress", it cannot be decommissioned
* until it becomes healthy again. If there are more pendingNodes than can be tracked
* & some unhealthy tracked nodes, then re-queue the unhealthy tracked nodes
* to avoid blocking decommissioning of healthy nodes.
*
* @param unhealthyDns The unhealthy datanodes which may be re-queued
* @param numDecommissioningNodes The total number of nodes being decommissioned
* @return Stream of unhealthy nodes to be re-queued
*/
Stream<DatanodeDescriptor> getUnhealthyNodesToRequeue(
final List<DatanodeDescriptor> unhealthyDns, int numDecommissioningNodes) {
if (!unhealthyDns.isEmpty()) {
// Compute the number of unhealthy nodes to re-queue
final int numUnhealthyNodesToRequeue =
Math.min(numDecommissioningNodes - maxConcurrentTrackedNodes, unhealthyDns.size());
LOG.warn("{} limit has been reached, re-queueing {} "
+ "nodes which are dead while in Decommission In Progress.",
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
numUnhealthyNodesToRequeue);
// Order unhealthy nodes by lastUpdate descending such that nodes
// which have been unhealthy the longest are preferred to be re-queued
return unhealthyDns.stream().sorted(PENDING_NODES_QUEUE_COMPARATOR.reversed())
.limit(numUnhealthyNodesToRequeue);
}
return Stream.empty();
}
} }

View File

@ -449,7 +449,7 @@ protected void refreshNodes(final int nnIndex) throws IOException {
refreshNodes(conf); refreshNodes(conf);
} }
static private DatanodeDescriptor getDatanodeDesriptor( static DatanodeDescriptor getDatanodeDesriptor(
final FSNamesystem ns, final String datanodeUuid) { final FSNamesystem ns, final String datanodeUuid) {
return ns.getBlockManager().getDatanodeManager().getDatanode(datanodeUuid); return ns.getBlockManager().getDatanodeManager().getDatanode(datanodeUuid);
} }

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
@ -26,6 +27,8 @@
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -40,6 +43,8 @@
import java.util.EnumSet; import java.util.EnumSet;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.text.TextStringBuilder; import org.apache.commons.text.TextStringBuilder;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
@ -77,6 +82,7 @@
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Lists; import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
@ -1299,7 +1305,7 @@ public void testBlocksPerInterval() throws Exception {
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY,
3); 3);
// Disable the normal monitor runs // Disable the normal monitor runs
getConf().setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, getConf().setInt(MiniDFSCluster.DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY,
Integer.MAX_VALUE); Integer.MAX_VALUE);
startCluster(1, 3); startCluster(1, 3);
final FileSystem fs = getCluster().getFileSystem(); final FileSystem fs = getCluster().getFileSystem();
@ -1352,7 +1358,7 @@ public void testPendingNodeButDecommissioned() throws Exception {
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES, DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
1); 1);
// Disable the normal monitor runs // Disable the normal monitor runs
getConf().setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, getConf().setInt(MiniDFSCluster.DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY,
Integer.MAX_VALUE); Integer.MAX_VALUE);
startCluster(1, 2); startCluster(1, 2);
final DatanodeManager datanodeManager = final DatanodeManager datanodeManager =
@ -1401,7 +1407,7 @@ public void testPendingNodes() throws Exception {
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES, DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
1); 1);
// Disable the normal monitor runs // Disable the normal monitor runs
getConf().setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, getConf().setInt(MiniDFSCluster.DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY,
Integer.MAX_VALUE); Integer.MAX_VALUE);
startCluster(1, 3); startCluster(1, 3);
final FileSystem fs = getCluster().getFileSystem(); final FileSystem fs = getCluster().getFileSystem();
@ -1654,4 +1660,229 @@ public Boolean get() {
cleanupFile(fileSys, file); cleanupFile(fileSys, file);
} }
/**
* Test DatanodeAdminManager logic to re-queue unhealthy decommissioning nodes
* which are blocking the decommissioning of healthy nodes.
* Force the tracked nodes set to be filled with nodes lost while decommissioning,
* then decommission healthy nodes & validate they are decommissioned eventually.
*/
@Test(timeout = 120000)
public void testRequeueUnhealthyDecommissioningNodes() throws Exception {
// Create a MiniDFSCluster with 3 live datanode in AdminState=NORMAL and
// 2 dead datanodes in AdminState=DECOMMISSION_INPROGRESS and a file
// with replication factor of 5.
final int numLiveNodes = 3;
final int numDeadNodes = 2;
final int numNodes = numLiveNodes + numDeadNodes;
final List<DatanodeDescriptor> liveNodes = new ArrayList<>();
final Map<DatanodeDescriptor, MiniDFSCluster.DataNodeProperties> deadNodeProps =
new HashMap<>();
final ArrayList<DatanodeInfo> decommissionedNodes = new ArrayList<>();
final Path filePath = new Path("/tmp/test");
createClusterWithDeadNodesDecommissionInProgress(numLiveNodes, liveNodes, numDeadNodes,
deadNodeProps, decommissionedNodes, filePath);
final FSNamesystem namesystem = getCluster().getNamesystem();
final BlockManager blockManager = namesystem.getBlockManager();
final DatanodeManager datanodeManager = blockManager.getDatanodeManager();
final DatanodeAdminManager decomManager = datanodeManager.getDatanodeAdminManager();
// Validate the 2 "dead" nodes are not removed from the tracked nodes set
// after several seconds of operation
final Duration checkDuration = Duration.ofSeconds(5);
Instant checkUntil = Instant.now().plus(checkDuration);
while (Instant.now().isBefore(checkUntil)) {
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
assertEquals(
"Unexpected number of decommissioning nodes queued in DatanodeAdminManager.",
0, decomManager.getNumPendingNodes());
assertEquals(
"Unexpected number of decommissioning nodes tracked in DatanodeAdminManager.",
numDeadNodes, decomManager.getNumTrackedNodes());
assertTrue(
"Dead decommissioning nodes unexpectedly transitioned out of DECOMMISSION_INPROGRESS.",
deadNodeProps.keySet().stream()
.allMatch(node -> node.getAdminState().equals(AdminStates.DECOMMISSION_INPROGRESS)));
Thread.sleep(500);
}
// Delete the file such that its no longer a factor blocking decommissioning of live nodes
// which have block replicas for that file
getCluster().getFileSystem().delete(filePath, true);
// Start decommissioning 2 "live" datanodes
int numLiveDecommNodes = 2;
final List<DatanodeDescriptor> liveDecommNodes = liveNodes.subList(0, numLiveDecommNodes);
for (final DatanodeDescriptor liveNode : liveDecommNodes) {
takeNodeOutofService(0, liveNode.getDatanodeUuid(), 0, decommissionedNodes,
AdminStates.DECOMMISSION_INPROGRESS);
decommissionedNodes.add(liveNode);
}
// Write a new file such that there are under-replicated blocks preventing decommissioning
// of dead nodes
writeFile(getCluster().getFileSystem(), filePath, numNodes, 10);
// Validate that the live datanodes are put into the pending decommissioning queue
GenericTestUtils.waitFor(() -> decomManager.getNumTrackedNodes() == numDeadNodes
&& decomManager.getNumPendingNodes() == numLiveDecommNodes
&& liveDecommNodes.stream().allMatch(
node -> node.getAdminState().equals(AdminStates.DECOMMISSION_INPROGRESS)),
500, 30000);
assertThat(liveDecommNodes)
.as("Check all live decommissioning nodes queued in DatanodeAdminManager")
.containsAll(decomManager.getPendingNodes());
// Run DatanodeAdminManager.Monitor, then validate the dead nodes are re-queued & the
// live nodes are decommissioned
if (this instanceof TestDecommissionWithBackoffMonitor) {
// For TestDecommissionWithBackoffMonitor a single tick/execution of the
// DatanodeAdminBackoffMonitor will re-queue the dead nodes, then call
// "processPendingNodes" to de-queue the live nodes & decommission them
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
assertEquals(
"DatanodeAdminBackoffMonitor did not re-queue dead decommissioning nodes as expected.",
2, decomManager.getNumPendingNodes());
assertEquals(
"DatanodeAdminBackoffMonitor did not re-queue dead decommissioning nodes as expected.",
0, decomManager.getNumTrackedNodes());
} else {
// For TestDecommission a single tick/execution of the DatanodeAdminDefaultMonitor
// will re-queue the dead nodes. A seconds tick is needed to de-queue the live nodes
// & decommission them
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
assertEquals(
"DatanodeAdminDefaultMonitor did not re-queue dead decommissioning nodes as expected.",
4, decomManager.getNumPendingNodes());
assertEquals(
"DatanodeAdminDefaultMonitor did not re-queue dead decommissioning nodes as expected.",
0, decomManager.getNumTrackedNodes());
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
assertEquals(
"DatanodeAdminDefaultMonitor did not decommission live nodes as expected.",
2, decomManager.getNumPendingNodes());
assertEquals(
"DatanodeAdminDefaultMonitor did not decommission live nodes as expected.",
0, decomManager.getNumTrackedNodes());
}
assertTrue("Live nodes not DECOMMISSIONED as expected.", liveDecommNodes.stream()
.allMatch(node -> node.getAdminState().equals(AdminStates.DECOMMISSIONED)));
assertTrue("Dead nodes not DECOMMISSION_INPROGRESS as expected.",
deadNodeProps.keySet().stream()
.allMatch(node -> node.getAdminState().equals(AdminStates.DECOMMISSION_INPROGRESS)));
assertThat(deadNodeProps.keySet())
.as("Check all dead decommissioning nodes queued in DatanodeAdminManager")
.containsAll(decomManager.getPendingNodes());
// Validate the 2 "dead" nodes are not removed from the tracked nodes set
// after several seconds of operation
checkUntil = Instant.now().plus(checkDuration);
while (Instant.now().isBefore(checkUntil)) {
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
assertEquals(
"Unexpected number of decommissioning nodes queued in DatanodeAdminManager.",
0, decomManager.getNumPendingNodes());
assertEquals(
"Unexpected number of decommissioning nodes tracked in DatanodeAdminManager.",
numDeadNodes, decomManager.getNumTrackedNodes());
assertTrue(
"Dead decommissioning nodes unexpectedly transitioned out of DECOMMISSION_INPROGRESS.",
deadNodeProps.keySet().stream()
.allMatch(node -> node.getAdminState().equals(AdminStates.DECOMMISSION_INPROGRESS)));
Thread.sleep(500);
}
// Delete the file such that there are no more under-replicated blocks
// allowing the dead nodes to be decommissioned
getCluster().getFileSystem().delete(filePath, true);
// Validate the dead nodes are eventually decommissioned
GenericTestUtils.waitFor(() -> {
try {
BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
} catch (ExecutionException | InterruptedException e) {
LOG.warn("Exception running DatanodeAdminMonitor", e);
return false;
}
return decomManager.getNumTrackedNodes() == 0 && decomManager.getNumPendingNodes() == 0
&& deadNodeProps.keySet().stream().allMatch(
node -> node.getAdminState().equals(AdminStates.DECOMMISSIONED));
}, 500, 30000);
}
/**
* Create a MiniDFSCluster with "numLiveNodes" live datanodes in AdminState=NORMAL and
* "numDeadNodes" dead datanodes in AdminState=DECOMMISSION_INPROGRESS. Create a file
* replicated to all datanodes.
*
* @param numLiveNodes - number of live nodes in cluster
* @param liveNodes - list which will be loaded with references to 3 live datanodes
* @param numDeadNodes - number of live nodes in cluster
* @param deadNodeProps - map which will be loaded with references to 2 dead datanodes
* @param decommissionedNodes - list which will be loaded with references to decommissioning nodes
* @param filePath - path used to create HDFS file
*/
private void createClusterWithDeadNodesDecommissionInProgress(final int numLiveNodes,
final List<DatanodeDescriptor> liveNodes, final int numDeadNodes,
final Map<DatanodeDescriptor, MiniDFSCluster.DataNodeProperties> deadNodeProps,
final ArrayList<DatanodeInfo> decommissionedNodes, final Path filePath) throws Exception {
assertTrue("Must have numLiveNode > 0", numLiveNodes > 0);
assertTrue("Must have numDeadNode > 0", numDeadNodes > 0);
int numNodes = numLiveNodes + numDeadNodes;
// Allow "numDeadNodes" datanodes to be decommissioned at a time
getConf()
.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES, numDeadNodes);
// Disable the normal monitor runs
getConf()
.setInt(MiniDFSCluster.DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY, Integer.MAX_VALUE);
// Start cluster with "numNodes" datanodes
startCluster(1, numNodes);
final FSNamesystem namesystem = getCluster().getNamesystem();
final BlockManager blockManager = namesystem.getBlockManager();
final DatanodeManager datanodeManager = blockManager.getDatanodeManager();
final DatanodeAdminManager decomManager = datanodeManager.getDatanodeAdminManager();
assertEquals(numNodes, getCluster().getDataNodes().size());
getCluster().waitActive();
// "numLiveNodes" datanodes will remain "live"
for (final DataNode node : getCluster().getDataNodes().subList(0, numLiveNodes)) {
liveNodes.add(getDatanodeDesriptor(namesystem, node.getDatanodeUuid()));
}
assertEquals(numLiveNodes, liveNodes.size());
// "numDeadNodes" datanodes will be "dead" while decommissioning
final List<DatanodeDescriptor> deadNodes =
getCluster().getDataNodes().subList(numLiveNodes, numNodes).stream()
.map(dn -> getDatanodeDesriptor(namesystem, dn.getDatanodeUuid()))
.collect(Collectors.toList());
assertEquals(numDeadNodes, deadNodes.size());
// Create file with block replicas on all nodes
writeFile(getCluster().getFileSystem(), filePath, numNodes, 10);
// Cause the "dead" nodes to be lost while in state decommissioning
// and fill the tracked nodes set with those "dead" nodes
for (final DatanodeDescriptor deadNode : deadNodes) {
// Start decommissioning the node, it will not be able to complete due to the
// under-replicated file
takeNodeOutofService(0, deadNode.getDatanodeUuid(), 0, decommissionedNodes,
AdminStates.DECOMMISSION_INPROGRESS);
decommissionedNodes.add(deadNode);
// Stop the datanode so that it is lost while decommissioning
MiniDFSCluster.DataNodeProperties dn = getCluster().stopDataNode(deadNode.getXferAddr());
deadNodeProps.put(deadNode, dn);
deadNode.setLastUpdate(213); // Set last heartbeat to be in the past
}
assertEquals(numDeadNodes, deadNodeProps.size());
// Wait for the decommissioning nodes to become dead & to be added to "pendingNodes"
GenericTestUtils.waitFor(() -> decomManager.getNumTrackedNodes() == 0
&& decomManager.getNumPendingNodes() == numDeadNodes
&& deadNodes.stream().allMatch(node ->
!BlockManagerTestUtil.isNodeHealthyForDecommissionOrMaintenance(blockManager, node)
&& !node.isAlive()), 500, 20000);
}
} }

View File

@ -367,6 +367,16 @@ public static void recheckDecommissionState(DatanodeManager dm)
dm.getDatanodeAdminManager().runMonitorForTest(); dm.getDatanodeAdminManager().runMonitorForTest();
} }
/**
* Have BlockManager check isNodeHealthyForDecommissionOrMaintenance for a given datanode.
* @param blockManager the BlockManager to check against
* @param dn the datanode to check
*/
public static boolean isNodeHealthyForDecommissionOrMaintenance(BlockManager blockManager,
DatanodeDescriptor dn) {
return blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
}
/** /**
* add block to the replicateBlocks queue of the Datanode * add block to the replicateBlocks queue of the Datanode
*/ */

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.PriorityQueue;
import java.util.stream.Collectors;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
public class TestDatanodeAdminMonitorBase {
public static final Logger LOG = LoggerFactory.getLogger(TestDatanodeAdminMonitorBase.class);
// Sort by lastUpdate time descending order, such that unhealthy
// nodes are de-prioritized given they cannot be decommissioned.
private static final int NUM_DATANODE = 10;
private static final int[] UNORDERED_LAST_UPDATE_TIMES =
new int[] {0, 5, 2, 11, 0, 3, 1001, 5, 1, 103};
private static final int[] ORDERED_LAST_UPDATE_TIMES =
new int[] {1001, 103, 11, 5, 5, 3, 2, 1, 0, 0};
private static final int[] REVERSE_ORDER_LAST_UPDATE_TIMES =
new int[] {0, 0, 1, 2, 3, 5, 5, 11, 103, 1001};
private static final DatanodeDescriptor[] NODES;
static {
NODES = new DatanodeDescriptor[NUM_DATANODE];
for (int i = 0; i < NUM_DATANODE; i++) {
NODES[i] = new DatanodeDescriptor(DatanodeID.EMPTY_DATANODE_ID);
NODES[i].setLastUpdate(UNORDERED_LAST_UPDATE_TIMES[i]);
NODES[i].setLastUpdateMonotonic(UNORDERED_LAST_UPDATE_TIMES[i]);
}
}
/**
* Verify that DatanodeAdminManager pendingNodes priority queue
* correctly orders the nodes by lastUpdate time descending.
*/
@Test
public void testPendingNodesQueueOrdering() {
final PriorityQueue<DatanodeDescriptor> pendingNodes =
new PriorityQueue<>(DatanodeAdminMonitorBase.PENDING_NODES_QUEUE_COMPARATOR);
pendingNodes.addAll(Arrays.asList(NODES));
for (int i = 0; i < NUM_DATANODE; i++) {
final DatanodeDescriptor dn = pendingNodes.poll();
Assert.assertNotNull(dn);
Assert.assertEquals(ORDERED_LAST_UPDATE_TIMES[i], dn.getLastUpdate());
}
}
/**
* Verify that DatanodeAdminManager logic to sort unhealthy nodes
* correctly orders the nodes by lastUpdate time ascending.
*/
@Test
public void testPendingNodesQueueReverseOrdering() {
final List<DatanodeDescriptor> nodes = Arrays.asList(NODES);
final List<DatanodeDescriptor> reverseOrderNodes =
nodes.stream().sorted(DatanodeAdminMonitorBase.PENDING_NODES_QUEUE_COMPARATOR.reversed())
.collect(Collectors.toList());
Assert.assertEquals(NUM_DATANODE, reverseOrderNodes.size());
for (int i = 0; i < NUM_DATANODE; i++) {
Assert.assertEquals(REVERSE_ORDER_LAST_UPDATE_TIMES[i],
reverseOrderNodes.get(i).getLastUpdate());
}
}
}