diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 7309846249..e63930a00b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -2031,7 +2031,8 @@ List getDatanodeDescriptors(List nodes) { * * We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes * since the former do not have write traffic and hence are less busy. - * We do not use already decommissioned nodes as a source. + * We do not use already decommissioned nodes as a source, unless there is + * no other choice. * Otherwise we randomly choose nodes among those that did not reach their * replication limits. However, if the recovery work is of the highest * priority and all nodes have reached their replication limits, we will @@ -2067,6 +2068,7 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block, List srcNodes = new ArrayList<>(); liveBlockIndices.clear(); final boolean isStriped = block.isStriped(); + DatanodeDescriptor decommissionedSrc = null; BitSet bitSet = isStriped ? new BitSet(((BlockInfoStriped) block).getTotalBlockNum()) : null; @@ -2085,13 +2087,24 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block, continue; } - // never use already decommissioned nodes, maintenance node not - // suitable for read or unknown state replicas. - if (state == null || state == StoredReplicaState.DECOMMISSIONED + // Never use maintenance node not suitable for read + // or unknown state replicas. + if (state == null || state == StoredReplicaState.MAINTENANCE_NOT_FOR_READ) { continue; } + // Save the live decommissioned replica in case we need it. Such replicas + // are normally not used for replication, but if nothing else is + // available, one can be selected as a source. + if (state == StoredReplicaState.DECOMMISSIONED) { + if (decommissionedSrc == null || + ThreadLocalRandom.current().nextBoolean()) { + decommissionedSrc = node; + } + continue; + } + if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY && (!node.isDecommissionInProgress() && !node.isEnteringMaintenance()) && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) { @@ -2123,6 +2136,13 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block, srcNodes.set(0, node); } } + + // Pick a live decommissioned replica, if nothing else is available. + if (!isStriped && nodesContainingLiveReplicas.isEmpty() && + srcNodes.isEmpty() && decommissionedSrc != null) { + srcNodes.add(decommissionedSrc); + } + return srcNodes.toArray(new DatanodeDescriptor[srcNodes.size()]); } @@ -3036,7 +3056,7 @@ private Block addStoredBlock(final BlockInfo block, int curReplicaDelta; if (result == AddBlockResult.ADDED) { - curReplicaDelta = 1; + curReplicaDelta = (node.isDecommissioned()) ? 0 : 1; if (logEveryBlock) { blockLog.debug("BLOCK* addStoredBlock: {} is added to {} (size={})", node, storedBlock, storedBlock.getNumBytes()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java index 3a26f4ae61..1a3848061d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java @@ -346,9 +346,9 @@ synchronized void update(BlockInfo block, int curReplicas, " curPri " + curPri + " oldPri " + oldPri); } - if(oldPri != curPri) { - remove(block, oldPri); - } + // oldPri is mostly correct, but not always. If not found with oldPri, + // other levels will be searched until the block is found & removed. + remove(block, oldPri); if(priorityQueues.get(curPri).add(block)) { NameNode.blockStateChangeLog.debug( "BLOCK* NameSystem.LowRedundancyBlock.update: {} has only {} " diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java index 8bdaa74e47..3cf025cea3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; @@ -72,6 +73,7 @@ public class TestDecommissioningStatus { private static FileSystem fileSys; private static HostsFileWriter hostsFileWriter; private static Configuration conf; + private Logger LOG; final ArrayList decommissionedNodes = new ArrayList(numDatanodes); @@ -89,8 +91,7 @@ public void setUp() throws Exception { conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt( DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 4); - conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, - 1000); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1); conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, 1); @@ -100,6 +101,7 @@ public void setUp() throws Exception { cluster.getNamesystem().getBlockManager().getDatanodeManager() .setHeartbeatExpireInterval(3000); Logger.getLogger(DecommissionManager.class).setLevel(Level.DEBUG); + LOG = Logger.getLogger(TestDecommissioningStatus.class); } @After @@ -366,4 +368,111 @@ public void testDecommissionDeadDN() throws Exception { hostsFileWriter.initExcludeHost(""); dm.refreshNodes(conf); } + + @Test(timeout=120000) + public void testDecommissionLosingData() throws Exception { + ArrayList nodes = new ArrayList(2); + FSNamesystem fsn = cluster.getNamesystem(); + BlockManager bm = fsn.getBlockManager(); + DatanodeManager dm = bm.getDatanodeManager(); + Path file1 = new Path("decommissionLosingData.dat"); + DFSTestUtil.createFile(fileSys, file1, fileSize, fileSize, blockSize, + (short)2, seed); + Thread.sleep(1000); + + // Shutdown dn1 + LOG.info("Shutdown dn1"); + DatanodeID dnID = cluster.getDataNodes().get(1).getDatanodeId(); + String dnName = dnID.getXferAddr(); + DatanodeDescriptor dnDescriptor1 = dm.getDatanode(dnID); + nodes.add(dnName); + DataNodeProperties stoppedDN1 = cluster.stopDataNode(1); + DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(), + false, 30000); + + // Shutdown dn0 + LOG.info("Shutdown dn0"); + dnID = cluster.getDataNodes().get(0).getDatanodeId(); + dnName = dnID.getXferAddr(); + DatanodeDescriptor dnDescriptor0 = dm.getDatanode(dnID); + nodes.add(dnName); + DataNodeProperties stoppedDN0 = cluster.stopDataNode(0); + DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(), + false, 30000); + + // Decommission the nodes. + LOG.info("Decommissioning nodes"); + hostsFileWriter.initExcludeHosts(nodes); + dm.refreshNodes(conf); + BlockManagerTestUtil.recheckDecommissionState(dm); + assertTrue(dnDescriptor0.isDecommissioned()); + assertTrue(dnDescriptor1.isDecommissioned()); + + // All nodes are dead and decommed. Blocks should be missing. + long missingBlocks = bm.getMissingBlocksCount(); + long underreplicated = bm.getUnderReplicatedBlocksCount(); + assertTrue(missingBlocks > 0); + assertTrue(underreplicated > 0); + + // Bring back dn0 + LOG.info("Bring back dn0"); + cluster.restartDataNode(stoppedDN0, true); + do { + dnID = cluster.getDataNodes().get(0).getDatanodeId(); + } while (dnID == null); + dnDescriptor0 = dm.getDatanode(dnID); + // Wait until it sends a block report. + while (dnDescriptor0.numBlocks() == 0) { + Thread.sleep(100); + } + + // Bring back dn1 + LOG.info("Bring back dn1"); + cluster.restartDataNode(stoppedDN1, true); + do { + dnID = cluster.getDataNodes().get(1).getDatanodeId(); + } while (dnID == null); + dnDescriptor1 = dm.getDatanode(dnID); + // Wait until it sends a block report. + while (dnDescriptor1.numBlocks() == 0) { + Thread.sleep(100); + } + + // Blocks should be still be under-replicated + Thread.sleep(2000); // Let replication monitor run + assertEquals(underreplicated, bm.getUnderReplicatedBlocksCount()); + + // Start up a node. + LOG.info("Starting two more nodes"); + cluster.startDataNodes(conf, 2, true, null, null); + cluster.waitActive(); + // Replication should fix it. + int count = 0; + while((bm.getUnderReplicatedBlocksCount() > 0 || + bm.getPendingReconstructionBlocksCount() > 0) && + count++ < 10) { + Thread.sleep(1000); + } + + assertEquals(0, bm.getUnderReplicatedBlocksCount()); + assertEquals(0, bm.getPendingReconstructionBlocksCount()); + assertEquals(0, bm.getMissingBlocksCount()); + + // Shutdown the extra nodes. + dnID = cluster.getDataNodes().get(3).getDatanodeId(); + cluster.stopDataNode(3); + DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(), + false, 30000); + + dnID = cluster.getDataNodes().get(2).getDatanodeId(); + cluster.stopDataNode(2); + DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(), + false, 30000); + + // Call refreshNodes on FSNamesystem with empty exclude file to remove the + // datanode from decommissioning list and make it available again. + hostsFileWriter.initExcludeHost(""); + dm.refreshNodes(conf); + fileSys.delete(file1, false); + } }