HDFS-11609. Some blocks can be permanently lost if nodes are decommissioned while dead. Contributed by Kihwal Lee.

This commit is contained in:
Kihwal Lee 2017-05-01 14:19:02 -05:00
parent 30fc580196
commit 07b98e7830
3 changed files with 139 additions and 10 deletions

View File

@ -2031,7 +2031,8 @@ List<DatanodeDescriptor> getDatanodeDescriptors(List<String> nodes) {
* *
* We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes * We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes
* since the former do not have write traffic and hence are less busy. * since the former do not have write traffic and hence are less busy.
* We do not use already decommissioned nodes as a source. * We do not use already decommissioned nodes as a source, unless there is
* no other choice.
* Otherwise we randomly choose nodes among those that did not reach their * Otherwise we randomly choose nodes among those that did not reach their
* replication limits. However, if the recovery work is of the highest * replication limits. However, if the recovery work is of the highest
* priority and all nodes have reached their replication limits, we will * priority and all nodes have reached their replication limits, we will
@ -2067,6 +2068,7 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
List<DatanodeDescriptor> srcNodes = new ArrayList<>(); List<DatanodeDescriptor> srcNodes = new ArrayList<>();
liveBlockIndices.clear(); liveBlockIndices.clear();
final boolean isStriped = block.isStriped(); final boolean isStriped = block.isStriped();
DatanodeDescriptor decommissionedSrc = null;
BitSet bitSet = isStriped ? BitSet bitSet = isStriped ?
new BitSet(((BlockInfoStriped) block).getTotalBlockNum()) : null; new BitSet(((BlockInfoStriped) block).getTotalBlockNum()) : null;
@ -2085,13 +2087,24 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
continue; continue;
} }
// never use already decommissioned nodes, maintenance node not // Never use maintenance node not suitable for read
// suitable for read or unknown state replicas. // or unknown state replicas.
if (state == null || state == StoredReplicaState.DECOMMISSIONED if (state == null
|| state == StoredReplicaState.MAINTENANCE_NOT_FOR_READ) { || state == StoredReplicaState.MAINTENANCE_NOT_FOR_READ) {
continue; continue;
} }
// Save the live decommissioned replica in case we need it. Such replicas
// are normally not used for replication, but if nothing else is
// available, one can be selected as a source.
if (state == StoredReplicaState.DECOMMISSIONED) {
if (decommissionedSrc == null ||
ThreadLocalRandom.current().nextBoolean()) {
decommissionedSrc = node;
}
continue;
}
if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY
&& (!node.isDecommissionInProgress() && !node.isEnteringMaintenance()) && (!node.isDecommissionInProgress() && !node.isEnteringMaintenance())
&& node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) { && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) {
@ -2123,6 +2136,13 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
srcNodes.set(0, node); srcNodes.set(0, node);
} }
} }
// Pick a live decommissioned replica, if nothing else is available.
if (!isStriped && nodesContainingLiveReplicas.isEmpty() &&
srcNodes.isEmpty() && decommissionedSrc != null) {
srcNodes.add(decommissionedSrc);
}
return srcNodes.toArray(new DatanodeDescriptor[srcNodes.size()]); return srcNodes.toArray(new DatanodeDescriptor[srcNodes.size()]);
} }
@ -3036,7 +3056,7 @@ private Block addStoredBlock(final BlockInfo block,
int curReplicaDelta; int curReplicaDelta;
if (result == AddBlockResult.ADDED) { if (result == AddBlockResult.ADDED) {
curReplicaDelta = 1; curReplicaDelta = (node.isDecommissioned()) ? 0 : 1;
if (logEveryBlock) { if (logEveryBlock) {
blockLog.debug("BLOCK* addStoredBlock: {} is added to {} (size={})", blockLog.debug("BLOCK* addStoredBlock: {} is added to {} (size={})",
node, storedBlock, storedBlock.getNumBytes()); node, storedBlock, storedBlock.getNumBytes());

View File

@ -346,9 +346,9 @@ synchronized void update(BlockInfo block, int curReplicas,
" curPri " + curPri + " curPri " + curPri +
" oldPri " + oldPri); " oldPri " + oldPri);
} }
if(oldPri != curPri) { // oldPri is mostly correct, but not always. If not found with oldPri,
// other levels will be searched until the block is found & removed.
remove(block, oldPri); remove(block, oldPri);
}
if(priorityQueues.get(curPri).add(block)) { if(priorityQueues.get(curPri).add(block)) {
NameNode.blockStateChangeLog.debug( NameNode.blockStateChangeLog.debug(
"BLOCK* NameSystem.LowRedundancyBlock.update: {} has only {} " "BLOCK* NameSystem.LowRedundancyBlock.update: {} has only {} "

View File

@ -46,6 +46,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
@ -72,6 +73,7 @@ public class TestDecommissioningStatus {
private static FileSystem fileSys; private static FileSystem fileSys;
private static HostsFileWriter hostsFileWriter; private static HostsFileWriter hostsFileWriter;
private static Configuration conf; private static Configuration conf;
private Logger LOG;
final ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes); final ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes);
@ -89,8 +91,7 @@ public void setUp() throws Exception {
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt( conf.setInt(
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 4); DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 4);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
1000);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1);
conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, 1); conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, 1);
@ -100,6 +101,7 @@ public void setUp() throws Exception {
cluster.getNamesystem().getBlockManager().getDatanodeManager() cluster.getNamesystem().getBlockManager().getDatanodeManager()
.setHeartbeatExpireInterval(3000); .setHeartbeatExpireInterval(3000);
Logger.getLogger(DecommissionManager.class).setLevel(Level.DEBUG); Logger.getLogger(DecommissionManager.class).setLevel(Level.DEBUG);
LOG = Logger.getLogger(TestDecommissioningStatus.class);
} }
@After @After
@ -366,4 +368,111 @@ public void testDecommissionDeadDN() throws Exception {
hostsFileWriter.initExcludeHost(""); hostsFileWriter.initExcludeHost("");
dm.refreshNodes(conf); dm.refreshNodes(conf);
} }
@Test(timeout=120000)
public void testDecommissionLosingData() throws Exception {
ArrayList<String> nodes = new ArrayList<String>(2);
FSNamesystem fsn = cluster.getNamesystem();
BlockManager bm = fsn.getBlockManager();
DatanodeManager dm = bm.getDatanodeManager();
Path file1 = new Path("decommissionLosingData.dat");
DFSTestUtil.createFile(fileSys, file1, fileSize, fileSize, blockSize,
(short)2, seed);
Thread.sleep(1000);
// Shutdown dn1
LOG.info("Shutdown dn1");
DatanodeID dnID = cluster.getDataNodes().get(1).getDatanodeId();
String dnName = dnID.getXferAddr();
DatanodeDescriptor dnDescriptor1 = dm.getDatanode(dnID);
nodes.add(dnName);
DataNodeProperties stoppedDN1 = cluster.stopDataNode(1);
DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(),
false, 30000);
// Shutdown dn0
LOG.info("Shutdown dn0");
dnID = cluster.getDataNodes().get(0).getDatanodeId();
dnName = dnID.getXferAddr();
DatanodeDescriptor dnDescriptor0 = dm.getDatanode(dnID);
nodes.add(dnName);
DataNodeProperties stoppedDN0 = cluster.stopDataNode(0);
DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(),
false, 30000);
// Decommission the nodes.
LOG.info("Decommissioning nodes");
hostsFileWriter.initExcludeHosts(nodes);
dm.refreshNodes(conf);
BlockManagerTestUtil.recheckDecommissionState(dm);
assertTrue(dnDescriptor0.isDecommissioned());
assertTrue(dnDescriptor1.isDecommissioned());
// All nodes are dead and decommed. Blocks should be missing.
long missingBlocks = bm.getMissingBlocksCount();
long underreplicated = bm.getUnderReplicatedBlocksCount();
assertTrue(missingBlocks > 0);
assertTrue(underreplicated > 0);
// Bring back dn0
LOG.info("Bring back dn0");
cluster.restartDataNode(stoppedDN0, true);
do {
dnID = cluster.getDataNodes().get(0).getDatanodeId();
} while (dnID == null);
dnDescriptor0 = dm.getDatanode(dnID);
// Wait until it sends a block report.
while (dnDescriptor0.numBlocks() == 0) {
Thread.sleep(100);
}
// Bring back dn1
LOG.info("Bring back dn1");
cluster.restartDataNode(stoppedDN1, true);
do {
dnID = cluster.getDataNodes().get(1).getDatanodeId();
} while (dnID == null);
dnDescriptor1 = dm.getDatanode(dnID);
// Wait until it sends a block report.
while (dnDescriptor1.numBlocks() == 0) {
Thread.sleep(100);
}
// Blocks should be still be under-replicated
Thread.sleep(2000); // Let replication monitor run
assertEquals(underreplicated, bm.getUnderReplicatedBlocksCount());
// Start up a node.
LOG.info("Starting two more nodes");
cluster.startDataNodes(conf, 2, true, null, null);
cluster.waitActive();
// Replication should fix it.
int count = 0;
while((bm.getUnderReplicatedBlocksCount() > 0 ||
bm.getPendingReconstructionBlocksCount() > 0) &&
count++ < 10) {
Thread.sleep(1000);
}
assertEquals(0, bm.getUnderReplicatedBlocksCount());
assertEquals(0, bm.getPendingReconstructionBlocksCount());
assertEquals(0, bm.getMissingBlocksCount());
// Shutdown the extra nodes.
dnID = cluster.getDataNodes().get(3).getDatanodeId();
cluster.stopDataNode(3);
DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(),
false, 30000);
dnID = cluster.getDataNodes().get(2).getDatanodeId();
cluster.stopDataNode(2);
DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(),
false, 30000);
// Call refreshNodes on FSNamesystem with empty exclude file to remove the
// datanode from decommissioning list and make it available again.
hostsFileWriter.initExcludeHost("");
dm.refreshNodes(conf);
fileSys.delete(file1, false);
}
} }