HDFS-16583. DatanodeAdminDefaultMonitor can get stuck in an infinite loop holding the write lock (#4332)
Co-authored-by: S O'Donnell <sodonnell@cloudera.com>
(cherry picked from commit 297f0f6d6a
)
This commit is contained in:
parent
ba856bff95
commit
55ba3a7944
@ -32,8 +32,6 @@
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.ArrayDeque;
|
|
||||||
import java.util.Queue;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -71,12 +69,6 @@ public class DatanodeAdminBackoffMonitor extends DatanodeAdminMonitorBase
|
|||||||
private HashMap<DatanodeDescriptor, HashMap<BlockInfo, Integer>>
|
private HashMap<DatanodeDescriptor, HashMap<BlockInfo, Integer>>
|
||||||
outOfServiceNodeBlocks = new HashMap<>();
|
outOfServiceNodeBlocks = new HashMap<>();
|
||||||
|
|
||||||
/**
|
|
||||||
* Any nodes where decommission or maintenance has been cancelled are added
|
|
||||||
* to this queue for later processing.
|
|
||||||
*/
|
|
||||||
private final Queue<DatanodeDescriptor> cancelledNodes = new ArrayDeque<>();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The numbe of blocks to process when moving blocks to pendingReplication
|
* The numbe of blocks to process when moving blocks to pendingReplication
|
||||||
* before releasing and reclaiming the namenode lock.
|
* before releasing and reclaiming the namenode lock.
|
||||||
@ -151,7 +143,7 @@ protected void processConf() {
|
|||||||
@Override
|
@Override
|
||||||
public void stopTrackingNode(DatanodeDescriptor dn) {
|
public void stopTrackingNode(DatanodeDescriptor dn) {
|
||||||
getPendingNodes().remove(dn);
|
getPendingNodes().remove(dn);
|
||||||
cancelledNodes.add(dn);
|
getCancelledNodes().add(dn);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -232,7 +224,7 @@ public void run() {
|
|||||||
"in maintenance or transitioning state. {} nodes pending. {} " +
|
"in maintenance or transitioning state. {} nodes pending. {} " +
|
||||||
"nodes waiting to be cancelled.",
|
"nodes waiting to be cancelled.",
|
||||||
numBlocksChecked, outOfServiceNodeBlocks.size(), getPendingNodes().size(),
|
numBlocksChecked, outOfServiceNodeBlocks.size(), getPendingNodes().size(),
|
||||||
cancelledNodes.size());
|
getCancelledNodes().size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -259,8 +251,8 @@ private void processPendingNodes() {
|
|||||||
* write lock to prevent the cancelledNodes list being modified externally.
|
* write lock to prevent the cancelledNodes list being modified externally.
|
||||||
*/
|
*/
|
||||||
private void processCancelledNodes() {
|
private void processCancelledNodes() {
|
||||||
while(!cancelledNodes.isEmpty()) {
|
while(!getCancelledNodes().isEmpty()) {
|
||||||
DatanodeDescriptor dn = cancelledNodes.poll();
|
DatanodeDescriptor dn = getCancelledNodes().poll();
|
||||||
outOfServiceNodeBlocks.remove(dn);
|
outOfServiceNodeBlocks.remove(dn);
|
||||||
pendingRep.remove(dn);
|
pendingRep.remove(dn);
|
||||||
}
|
}
|
||||||
|
@ -124,7 +124,7 @@ private boolean exceededNumBlocksPerCheck() {
|
|||||||
@Override
|
@Override
|
||||||
public void stopTrackingNode(DatanodeDescriptor dn) {
|
public void stopTrackingNode(DatanodeDescriptor dn) {
|
||||||
getPendingNodes().remove(dn);
|
getPendingNodes().remove(dn);
|
||||||
outOfServiceNodeBlocks.remove(dn);
|
getCancelledNodes().add(dn);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -152,6 +152,7 @@ public void run() {
|
|||||||
// Check decommission or maintenance progress.
|
// Check decommission or maintenance progress.
|
||||||
namesystem.writeLock();
|
namesystem.writeLock();
|
||||||
try {
|
try {
|
||||||
|
processCancelledNodes();
|
||||||
processPendingNodes();
|
processPendingNodes();
|
||||||
check();
|
check();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
@ -180,6 +181,20 @@ private void processPendingNodes() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process any nodes which have had their decommission or maintenance mode
|
||||||
|
* cancelled by an administrator.
|
||||||
|
*
|
||||||
|
* This method must be executed under the write lock to prevent the
|
||||||
|
* internal structures being modified concurrently.
|
||||||
|
*/
|
||||||
|
private void processCancelledNodes() {
|
||||||
|
while(!getCancelledNodes().isEmpty()) {
|
||||||
|
DatanodeDescriptor dn = getCancelledNodes().poll();
|
||||||
|
outOfServiceNodeBlocks.remove(dn);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void check() {
|
private void check() {
|
||||||
final Iterator<Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>>
|
final Iterator<Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>>
|
||||||
it = new CyclicIteration<>(outOfServiceNodeBlocks,
|
it = new CyclicIteration<>(outOfServiceNodeBlocks,
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.ArrayDeque;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.PriorityQueue;
|
import java.util.PriorityQueue;
|
||||||
@ -53,6 +54,12 @@ public abstract class DatanodeAdminMonitorBase
|
|||||||
private final PriorityQueue<DatanodeDescriptor> pendingNodes = new PriorityQueue<>(
|
private final PriorityQueue<DatanodeDescriptor> pendingNodes = new PriorityQueue<>(
|
||||||
PENDING_NODES_QUEUE_COMPARATOR);
|
PENDING_NODES_QUEUE_COMPARATOR);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Any nodes where decommission or maintenance has been cancelled are added
|
||||||
|
* to this queue for later processing.
|
||||||
|
*/
|
||||||
|
private final Queue<DatanodeDescriptor> cancelledNodes = new ArrayDeque<>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The maximum number of nodes to track in outOfServiceNodeBlocks.
|
* The maximum number of nodes to track in outOfServiceNodeBlocks.
|
||||||
* A value of 0 means no limit.
|
* A value of 0 means no limit.
|
||||||
@ -163,6 +170,11 @@ public Queue<DatanodeDescriptor> getPendingNodes() {
|
|||||||
return pendingNodes;
|
return pendingNodes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Queue<DatanodeDescriptor> getCancelledNodes() {
|
||||||
|
return cancelledNodes;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If node "is dead while in Decommission In Progress", it cannot be decommissioned
|
* If node "is dead while in Decommission In Progress", it cannot be decommissioned
|
||||||
* until it becomes healthy again. If there are more pendingNodes than can be tracked
|
* until it becomes healthy again. If there are more pendingNodes than can be tracked
|
||||||
|
@ -32,6 +32,7 @@ public interface DatanodeAdminMonitorInterface extends Runnable {
|
|||||||
int getTrackedNodeCount();
|
int getTrackedNodeCount();
|
||||||
int getNumNodesChecked();
|
int getNumNodesChecked();
|
||||||
Queue<DatanodeDescriptor> getPendingNodes();
|
Queue<DatanodeDescriptor> getPendingNodes();
|
||||||
|
Queue<DatanodeDescriptor> getCancelledNodes();
|
||||||
|
|
||||||
void setBlockManager(BlockManager bm);
|
void setBlockManager(BlockManager bm);
|
||||||
void setDatanodeAdminManager(DatanodeAdminManager dnm);
|
void setDatanodeAdminManager(DatanodeAdminManager dnm);
|
||||||
|
Loading…
Reference in New Issue
Block a user