HDFS-13167. DatanodeAdminManager Improvements. Contributed by BELUGA BEHR.

This commit is contained in:
Inigo Goiri 2018-02-20 15:18:27 -08:00
parent 17c592e6cf
commit 6f81cc0bee

View File

@ -21,8 +21,9 @@
import static org.apache.hadoop.util.Time.monotonicNow; import static org.apache.hadoop.util.Time.monotonicNow;
import java.util.AbstractList; import java.util.AbstractList;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
@ -139,7 +140,7 @@ public class DatanodeAdminManager {
new ThreadFactoryBuilder().setNameFormat("DatanodeAdminMonitor-%d") new ThreadFactoryBuilder().setNameFormat("DatanodeAdminMonitor-%d")
.setDaemon(true).build()); .setDaemon(true).build());
outOfServiceNodeBlocks = new TreeMap<>(); outOfServiceNodeBlocks = new TreeMap<>();
pendingNodes = new LinkedList<>(); pendingNodes = new ArrayDeque<>();
} }
/** /**
@ -219,7 +220,7 @@ public void startDecommission(DatanodeDescriptor node) {
pendingNodes.add(node); pendingNodes.add(node);
} }
} else { } else {
LOG.trace("startDecommission: Node {} in {}, nothing to do." + LOG.trace("startDecommission: Node {} in {}, nothing to do.",
node, node.getAdminState()); node, node.getAdminState());
} }
} }
@ -242,7 +243,7 @@ public void stopDecommission(DatanodeDescriptor node) {
pendingNodes.remove(node); pendingNodes.remove(node);
outOfServiceNodeBlocks.remove(node); outOfServiceNodeBlocks.remove(node);
} else { } else {
LOG.trace("stopDecommission: Node {} in {}, nothing to do." + LOG.trace("stopDecommission: Node {} in {}, nothing to do.",
node, node.getAdminState()); node, node.getAdminState());
} }
} }
@ -272,7 +273,7 @@ public void startMaintenance(DatanodeDescriptor node,
// IN_MAINTENANCE to support maintenance expiration. // IN_MAINTENANCE to support maintenance expiration.
pendingNodes.add(node); pendingNodes.add(node);
} else { } else {
LOG.trace("startMaintenance: Node {} in {}, nothing to do." + LOG.trace("startMaintenance: Node {} in {}, nothing to do.",
node, node.getAdminState()); node, node.getAdminState());
} }
} }
@ -321,7 +322,7 @@ public void stopMaintenance(DatanodeDescriptor node) {
pendingNodes.remove(node); pendingNodes.remove(node);
outOfServiceNodeBlocks.remove(node); outOfServiceNodeBlocks.remove(node);
} else { } else {
LOG.trace("stopMaintenance: Node {} in {}, nothing to do." + LOG.trace("stopMaintenance: Node {} in {}, nothing to do.",
node, node.getAdminState()); node, node.getAdminState());
} }
} }
@ -395,7 +396,7 @@ private void logBlockReplicationInfo(BlockInfo block,
for (DatanodeStorageInfo storage : storages) { for (DatanodeStorageInfo storage : storages) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor(); final DatanodeDescriptor node = storage.getDatanodeDescriptor();
nodeList.append(node); nodeList.append(node);
nodeList.append(" "); nodeList.append(' ');
} }
NameNode.blockStateChangeLog.info( NameNode.blockStateChangeLog.info(
"Block: " + block + ", Expected Replicas: " "Block: " + block + ", Expected Replicas: "
@ -517,7 +518,7 @@ private void check() {
final Iterator<Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>> final Iterator<Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>>
it = new CyclicIteration<>(outOfServiceNodeBlocks, it = new CyclicIteration<>(outOfServiceNodeBlocks,
iterkey).iterator(); iterkey).iterator();
final LinkedList<DatanodeDescriptor> toRemove = new LinkedList<>(); final List<DatanodeDescriptor> toRemove = new ArrayList<>();
while (it.hasNext() && !exceededNumBlocksPerCheck() && namesystem while (it.hasNext() && !exceededNumBlocksPerCheck() && namesystem
.isRunning()) { .isRunning()) {
@ -583,12 +584,12 @@ private void check() {
"A node is in an invalid state!"); "A node is in an invalid state!");
} }
LOG.debug("Node {} is sufficiently replicated and healthy, " LOG.debug("Node {} is sufficiently replicated and healthy, "
+ "marked as {}.", dn.getAdminState()); + "marked as {}.", dn, dn.getAdminState());
} else { } else {
LOG.debug("Node {} {} healthy." LOG.debug("Node {} {} healthy."
+ " It needs to replicate {} more blocks." + " It needs to replicate {} more blocks."
+ " {} is still in progress.", dn, + " {} is still in progress.", dn,
isHealthy? "is": "isn't", blocks.size(), dn.getAdminState()); isHealthy ? "is": "isn't", blocks.size(), dn.getAdminState());
} }
} else { } else {
LOG.debug("Node {} still has {} blocks to replicate " LOG.debug("Node {} still has {} blocks to replicate "
@ -744,10 +745,10 @@ private void processBlocksInternal(
lowRedundancyBlocks++; lowRedundancyBlocks++;
if (bc.isUnderConstruction()) { if (bc.isUnderConstruction()) {
INode ucFile = namesystem.getFSDirectory().getInode(bc.getId()); INode ucFile = namesystem.getFSDirectory().getInode(bc.getId());
if(!(ucFile instanceof INodeFile) || if (!(ucFile instanceof INodeFile) ||
!ucFile.asFile().isUnderConstruction()) { !ucFile.asFile().isUnderConstruction()) {
LOG.warn("File " + ucFile.getLocalName() + " is not under " + LOG.warn("File {} is not under construction. Skipping add to " +
"construction. Skipping add to low redundancy open files!"); "low redundancy open files!", ucFile.getLocalName());
} else { } else {
lowRedundancyBlocksInOpenFiles++; lowRedundancyBlocksInOpenFiles++;
lowRedundancyOpenFiles.add(ucFile.getId()); lowRedundancyOpenFiles.add(ucFile.getId());