diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java index 6327b4f254..82c201fe6a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java @@ -484,6 +484,7 @@ public class DatanodeAdminManager { @Override public void run() { + LOG.debug("DatanodeAdminMonitor is running."); if (!namesystem.isRunning()) { LOG.info("Namesystem is not running, skipping " + "decommissioning/maintenance checks."); @@ -498,6 +499,9 @@ public class DatanodeAdminManager { try { processPendingNodes(); check(); + } catch (Exception e) { + LOG.warn("DatanodeAdminMonitor caught exception when processing node.", + e); } finally { namesystem.writeUnlock(); } @@ -531,83 +535,96 @@ public class DatanodeAdminManager { final Map.Entry> entry = it.next(); final DatanodeDescriptor dn = entry.getKey(); - AbstractList blocks = entry.getValue(); - boolean fullScan = false; - if (dn.isMaintenance() && dn.maintenanceExpired()) { - // If maintenance expires, stop tracking it. - stopMaintenance(dn); - toRemove.add(dn); - continue; - } - if (dn.isInMaintenance()) { - // The dn is IN_MAINTENANCE and the maintenance hasn't expired yet. - continue; - } - if (blocks == null) { - // This is a newly added datanode, run through its list to schedule - // under-replicated blocks for replication and collect the blocks - // that are insufficiently replicated for further tracking - LOG.debug("Newly-added node {}, doing full scan to find " + - "insufficiently-replicated blocks.", dn); - blocks = handleInsufficientlyStored(dn); - outOfServiceNodeBlocks.put(dn, blocks); - fullScan = true; - } else { - // This is a known datanode, check if its # of insufficiently - // replicated blocks has dropped to zero and if it can move - // to the next state. - LOG.debug("Processing {} node {}", dn.getAdminState(), dn); - pruneReliableBlocks(dn, blocks); - } - if (blocks.size() == 0) { - if (!fullScan) { - // If we didn't just do a full scan, need to re-check with the - // full block map. - // - // We've replicated all the known insufficiently replicated - // blocks. Re-check with the full block map before finally - // marking the datanode as DECOMMISSIONED or IN_MAINTENANCE. - LOG.debug("Node {} has finished replicating current set of " - + "blocks, checking with the full block map.", dn); + try { + AbstractList blocks = entry.getValue(); + boolean fullScan = false; + if (dn.isMaintenance() && dn.maintenanceExpired()) { + // If maintenance expires, stop tracking it. + stopMaintenance(dn); + toRemove.add(dn); + continue; + } + if (dn.isInMaintenance()) { + // The dn is IN_MAINTENANCE and the maintenance hasn't expired yet. + continue; + } + if (blocks == null) { + // This is a newly added datanode, run through its list to schedule + // under-replicated blocks for replication and collect the blocks + // that are insufficiently replicated for further tracking + LOG.debug("Newly-added node {}, doing full scan to find " + + "insufficiently-replicated blocks.", dn); blocks = handleInsufficientlyStored(dn); outOfServiceNodeBlocks.put(dn, blocks); - } - // If the full scan is clean AND the node liveness is okay, - // we can finally mark as DECOMMISSIONED or IN_MAINTENANCE. - final boolean isHealthy = - blockManager.isNodeHealthyForDecommissionOrMaintenance(dn); - if (blocks.size() == 0 && isHealthy) { - if (dn.isDecommissionInProgress()) { - setDecommissioned(dn); - toRemove.add(dn); - } else if (dn.isEnteringMaintenance()) { - // IN_MAINTENANCE node remains in the outOfServiceNodeBlocks to - // to track maintenance expiration. - setInMaintenance(dn); - } else { - Preconditions.checkState(false, - "A node is in an invalid state!"); - } - LOG.debug("Node {} is sufficiently replicated and healthy, " - + "marked as {}.", dn, dn.getAdminState()); + fullScan = true; } else { - LOG.debug("Node {} {} healthy." - + " It needs to replicate {} more blocks." - + " {} is still in progress.", dn, - isHealthy ? "is": "isn't", blocks.size(), dn.getAdminState()); + // This is a known datanode, check if its # of insufficiently + // replicated blocks has dropped to zero and if it can move + // to the next state. + LOG.debug("Processing {} node {}", dn.getAdminState(), dn); + pruneReliableBlocks(dn, blocks); } - } else { - LOG.debug("Node {} still has {} blocks to replicate " - + "before it is a candidate to finish {}.", - dn, blocks.size(), dn.getAdminState()); + if (blocks.size() == 0) { + if (!fullScan) { + // If we didn't just do a full scan, need to re-check with the + // full block map. + // + // We've replicated all the known insufficiently replicated + // blocks. Re-check with the full block map before finally + // marking the datanode as DECOMMISSIONED or IN_MAINTENANCE. + LOG.debug("Node {} has finished replicating current set of " + + "blocks, checking with the full block map.", dn); + blocks = handleInsufficientlyStored(dn); + outOfServiceNodeBlocks.put(dn, blocks); + } + // If the full scan is clean AND the node liveness is okay, + // we can finally mark as DECOMMISSIONED or IN_MAINTENANCE. + final boolean isHealthy = + blockManager.isNodeHealthyForDecommissionOrMaintenance(dn); + if (blocks.size() == 0 && isHealthy) { + if (dn.isDecommissionInProgress()) { + setDecommissioned(dn); + toRemove.add(dn); + } else if (dn.isEnteringMaintenance()) { + // IN_MAINTENANCE node remains in the outOfServiceNodeBlocks to + // to track maintenance expiration. + setInMaintenance(dn); + } else { + Preconditions.checkState(false, + "Node %s is in an invalid state! " + + "Invalid state: %s %s blocks are on this dn.", + dn, dn.getAdminState(), blocks.size()); + } + LOG.debug("Node {} is sufficiently replicated and healthy, " + + "marked as {}.", dn, dn.getAdminState()); + } else { + LOG.debug("Node {} {} healthy." + + " It needs to replicate {} more blocks." + + " {} is still in progress.", dn, + isHealthy ? "is": "isn't", blocks.size(), dn.getAdminState()); + } + } else { + LOG.debug("Node {} still has {} blocks to replicate " + + "before it is a candidate to finish {}.", + dn, blocks.size(), dn.getAdminState()); + } + } catch (Exception e) { + // Log and postpone to process node when meet exception since it is in + // an invalid state. + LOG.warn("DatanodeAdminMonitor caught exception when processing node " + + "{}.", dn, e); + pendingNodes.add(dn); + toRemove.add(dn); + } finally { + iterkey = dn; } - iterkey = dn; } // Remove the datanodes that are DECOMMISSIONED or in service after // maintenance expiration. for (DatanodeDescriptor dn : toRemove) { Preconditions.checkState(dn.isDecommissioned() || dn.isInService(), - "Removing a node that is not yet decommissioned or in service!"); + "Removing node %s that is not yet decommissioned or in service!", + dn); outOfServiceNodeBlocks.remove(dn); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java index b5ff36c225..f6529e024b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -35,6 +36,7 @@ import java.util.Map; import java.util.Scanner; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; import com.google.common.base.Supplier; import com.google.common.collect.Lists; @@ -1181,6 +1183,56 @@ public class TestDecommission extends AdminStatesBaseTest { } } + /** + * Test DatanodeAdminManager#monitor can swallow any exceptions by default. + */ + @Test(timeout=120000) + public void testPendingNodeButDecommissioned() throws Exception { + // Only allow one node to be decom'd at a time + getConf().setInt( + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES, + 1); + // Disable the normal monitor runs + getConf().setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, + Integer.MAX_VALUE); + startCluster(1, 2); + final DatanodeManager datanodeManager = + getCluster().getNamesystem().getBlockManager().getDatanodeManager(); + final DatanodeAdminManager decomManager = + datanodeManager.getDatanodeAdminManager(); + + ArrayList decommissionedNodes = Lists.newArrayList(); + List dns = getCluster().getDataNodes(); + // Try to decommission 2 datanodes + for (int i = 0; i < 2; i++) { + DataNode d = dns.get(i); + DatanodeInfo dn = takeNodeOutofService(0, d.getDatanodeUuid(), 0, + decommissionedNodes, AdminStates.DECOMMISSION_INPROGRESS); + decommissionedNodes.add(dn); + } + + assertEquals(2, decomManager.getNumPendingNodes()); + + // Set one datanode state to Decommissioned after decommission ops. + DatanodeDescriptor dn = datanodeManager.getDatanode(dns.get(0) + .getDatanodeId()); + dn.setDecommissioned(); + + try { + // Trigger DatanodeAdminManager#monitor + BlockManagerTestUtil.recheckDecommissionState(datanodeManager); + + // Wait for OutOfServiceNodeBlocks to be 0 + GenericTestUtils.waitFor(() -> decomManager.getNumTrackedNodes() == 0, + 500, 30000); + assertTrue(GenericTestUtils.anyThreadMatching( + Pattern.compile("DatanodeAdminMonitor-.*"))); + } catch (ExecutionException e) { + GenericTestUtils.assertExceptionContains("in an invalid state!", e); + fail("DatanodeAdminManager#monitor does not swallow exceptions."); + } + } + @Test(timeout=120000) public void testPendingNodes() throws Exception { org.apache.log4j.Logger.getLogger(DatanodeAdminManager.class)