HDFS-5438. Flaws in block report processing can cause data loss. Contributed by Kihwal Lee.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1542054 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Kihwal Lee 2013-11-14 20:11:50 +00:00
parent 1a76ccbbc4
commit ceea91c9cd
9 changed files with 211 additions and 30 deletions

View File

@ -560,6 +560,8 @@ Release 2.3.0 - UNRELEASED
HDFS-5504. In HA mode, OP_DELETE_SNAPSHOT is not decrementing the safemode threshold,
leads to NN safemode. (Vinay via jing9)
HDFS-5438. Flaws in block report processing can cause data loss. (kihwal)
Release 2.2.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -42,4 +42,8 @@ public boolean corruptPacket() {
public boolean uncorruptPacket() {
return false;
}
public boolean failPacket() {
return false;
}
}

View File

@ -151,6 +151,7 @@ public class DFSOutputStream extends FSOutputSummer
private final short blockReplication; // replication factor of file
private boolean shouldSyncBlock = false; // force blocks to disk upon close
private CachingStrategy cachingStrategy;
private boolean failPacket = false;
private static class Packet {
private static final long HEART_BEAT_SEQNO = -1L;
@ -743,6 +744,16 @@ public void run() {
one.seqno + " but received " + seqno);
}
isLastPacketInBlock = one.lastPacketInBlock;
// Fail the packet write for testing in order to force a
// pipeline recovery.
if (DFSClientFaultInjector.get().failPacket() &&
isLastPacketInBlock) {
failPacket = true;
throw new IOException(
"Failing the last packet for testing.");
}
// update bytesAcked
block.setNumBytes(one.getLastByteOffsetBlock());
@ -1027,8 +1038,19 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException {
accessToken = lb.getBlockToken();
// set up the pipeline again with the remaining nodes
if (failPacket) { // for testing
success = createBlockOutputStream(nodes, newGS-1, isRecovery);
failPacket = false;
try {
// Give DNs time to send in bad reports. In real situations,
// good reports should follow bad ones, if client committed
// with those nodes.
Thread.sleep(2000);
} catch (InterruptedException ie) {}
} else {
success = createBlockOutputStream(nodes, newGS, isRecovery);
}
}
if (success) {
// update pipeline at the namenode
@ -1886,7 +1908,9 @@ public synchronized void close() throws IOException {
// be called during unit tests
private void completeFile(ExtendedBlock last) throws IOException {
long localstart = Time.now();
long localTimeout = 400;
boolean fileComplete = false;
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
while (!fileComplete) {
fileComplete =
dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId);
@ -1902,7 +1926,13 @@ private void completeFile(ExtendedBlock last) throws IOException {
throw new IOException(msg);
}
try {
Thread.sleep(400);
Thread.sleep(localTimeout);
if (retries == 0) {
throw new IOException("Unable to close file because the last block"
+ " does not have enough number of replicas.");
}
retries--;
localTimeout *= 2;
if (Time.now() - localstart > 5000) {
DFSClient.LOG.info("Could not complete " + src + " retrying...");
}

View File

@ -229,6 +229,29 @@ public long getBlockRecoveryId() {
return blockRecoveryId;
}
/**
* Process the recorded replicas. When about to commit or finish the
* pipeline recovery sort out bad replicas.
* @param genStamp The final generation stamp for the block.
*/
public void setGenerationStampAndVerifyReplicas(long genStamp) {
if (replicas == null)
return;
// Remove the replicas with wrong gen stamp.
// The replica list is unchanged.
for (ReplicaUnderConstruction r : replicas) {
if (genStamp != r.getGenerationStamp()) {
r.getExpectedLocation().removeBlock(this);
NameNode.blockStateChangeLog.info("BLOCK* Removing stale replica "
+ "from location: " + r);
}
}
// Set the generation stamp for the block.
setGenerationStamp(genStamp);
}
/**
* Commit block's length and generation stamp as reported by the client.
* Set block state to {@link BlockUCState#COMMITTED}.
@ -295,9 +318,13 @@ public void initializeBlockRecovery(long recoveryId) {
void addReplicaIfNotPresent(DatanodeDescriptor dn,
Block block,
ReplicaState rState) {
for(ReplicaUnderConstruction r : replicas)
if(r.getExpectedLocation() == dn)
for (ReplicaUnderConstruction r : replicas) {
if (r.getExpectedLocation() == dn) {
// Record the gen stamp from the report
r.setGenerationStamp(block.getGenerationStamp());
return;
}
}
replicas.add(new ReplicaUnderConstruction(block, dn, rState));
}

View File

@ -57,6 +57,7 @@
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@ -1046,7 +1047,8 @@ public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
+ blk + " not found");
return;
}
markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason), dn);
markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason,
Reason.CORRUPTION_REPORTED), dn);
}
private void markBlockAsCorrupt(BlockToMarkCorrupt b,
@ -1069,7 +1071,8 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b,
node.addBlock(b.stored);
// Add this replica to corruptReplicas Map
corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason);
corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
b.reasonCode);
if (countNodes(b.stored).liveReplicas() >= bc.getBlockReplication()) {
// the block is over-replicated so invalidate the replicas immediately
invalidateBlock(b, node);
@ -1570,22 +1573,27 @@ private static class BlockToMarkCorrupt {
final BlockInfo stored;
/** The reason to mark corrupt. */
final String reason;
/** The reason code to be stored */
final Reason reasonCode;
BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason) {
BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason,
Reason reasonCode) {
Preconditions.checkNotNull(corrupted, "corrupted is null");
Preconditions.checkNotNull(stored, "stored is null");
this.corrupted = corrupted;
this.stored = stored;
this.reason = reason;
this.reasonCode = reasonCode;
}
BlockToMarkCorrupt(BlockInfo stored, String reason) {
this(stored, stored, reason);
BlockToMarkCorrupt(BlockInfo stored, String reason, Reason reasonCode) {
this(stored, stored, reason, reasonCode);
}
BlockToMarkCorrupt(BlockInfo stored, long gs, String reason) {
this(new BlockInfo(stored), stored, reason);
BlockToMarkCorrupt(BlockInfo stored, long gs, String reason,
Reason reasonCode) {
this(new BlockInfo(stored), stored, reason, reasonCode);
//the corrupted block in datanode has a different generation stamp
corrupted.setGenerationStamp(gs);
}
@ -1930,9 +1938,11 @@ private BlockInfo processReportedBlock(final DatanodeDescriptor dn,
return storedBlock;
}
//add replica if appropriate
// Add replica if appropriate. If the replica was previously corrupt
// but now okay, it might need to be updated.
if (reportedState == ReplicaState.FINALIZED
&& storedBlock.findDatanode(dn) < 0) {
&& (storedBlock.findDatanode(dn) < 0
|| corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
toAdd.add(storedBlock);
}
return storedBlock;
@ -2023,12 +2033,13 @@ private BlockToMarkCorrupt checkReplicaCorrupt(
return new BlockToMarkCorrupt(storedBlock, reportedGS,
"block is " + ucState + " and reported genstamp " + reportedGS
+ " does not match genstamp in block map "
+ storedBlock.getGenerationStamp());
+ storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
} else if (storedBlock.getNumBytes() != reported.getNumBytes()) {
return new BlockToMarkCorrupt(storedBlock,
"block is " + ucState + " and reported length " +
reported.getNumBytes() + " does not match " +
"length in block map " + storedBlock.getNumBytes());
"length in block map " + storedBlock.getNumBytes(),
Reason.SIZE_MISMATCH);
} else {
return null; // not corrupt
}
@ -2044,7 +2055,7 @@ private BlockToMarkCorrupt checkReplicaCorrupt(
return new BlockToMarkCorrupt(storedBlock, reportedGS,
"reported " + reportedState + " replica with genstamp " + reportedGS
+ " does not match COMPLETE block's genstamp in block map "
+ storedBlock.getGenerationStamp());
+ storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
} else { // COMPLETE block, same genstamp
if (reportedState == ReplicaState.RBW) {
// If it's a RBW report for a COMPLETE block, it may just be that
@ -2057,7 +2068,8 @@ private BlockToMarkCorrupt checkReplicaCorrupt(
return null;
} else {
return new BlockToMarkCorrupt(storedBlock,
"reported replica has invalid state " + reportedState);
"reported replica has invalid state " + reportedState,
Reason.INVALID_STATE);
}
}
case RUR: // should not be reported
@ -2068,7 +2080,7 @@ private BlockToMarkCorrupt checkReplicaCorrupt(
" on " + dn + " size " + storedBlock.getNumBytes();
// log here at WARN level since this is really a broken HDFS invariant
LOG.warn(msg);
return new BlockToMarkCorrupt(storedBlock, msg);
return new BlockToMarkCorrupt(storedBlock, msg, Reason.INVALID_STATE);
}
}
@ -2184,6 +2196,11 @@ private Block addStoredBlock(final BlockInfo block,
logAddStoredBlock(storedBlock, node);
}
} else {
// if the same block is added again and the replica was corrupt
// previously because of a wrong gen stamp, remove it from the
// corrupt block list.
corruptReplicas.removeFromCorruptReplicasMap(block, node,
Reason.GENSTAMP_MISMATCH);
curReplicaDelta = 0;
blockLog.warn("BLOCK* addStoredBlock: "
+ "Redundant addStoredBlock request received for " + storedBlock
@ -2280,7 +2297,8 @@ private void invalidateCorruptReplicas(BlockInfo blk) {
DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
for (DatanodeDescriptor node : nodesCopy) {
try {
if (!invalidateBlock(new BlockToMarkCorrupt(blk, null), node)) {
if (!invalidateBlock(new BlockToMarkCorrupt(blk, null,
Reason.ANY), node)) {
removedFromBlocksMap = false;
}
} catch (IOException e) {

View File

@ -36,8 +36,18 @@
@InterfaceAudience.Private
public class CorruptReplicasMap{
private SortedMap<Block, Collection<DatanodeDescriptor>> corruptReplicasMap =
new TreeMap<Block, Collection<DatanodeDescriptor>>();
/** The corruption reason code */
public static enum Reason {
NONE, // not specified.
ANY, // wildcard reason
GENSTAMP_MISMATCH, // mismatch in generation stamps
SIZE_MISMATCH, // mismatch in sizes
INVALID_STATE, // invalid state
CORRUPTION_REPORTED // client or datanode reported the corruption
}
private SortedMap<Block, Map<DatanodeDescriptor, Reason>> corruptReplicasMap =
new TreeMap<Block, Map<DatanodeDescriptor, Reason>>();
/**
* Mark the block belonging to datanode as corrupt.
@ -48,9 +58,22 @@ public class CorruptReplicasMap{
*/
public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
String reason) {
Collection<DatanodeDescriptor> nodes = getNodes(blk);
addToCorruptReplicasMap(blk, dn, reason, Reason.NONE);
}
/**
* Mark the block belonging to datanode as corrupt.
*
* @param blk Block to be added to CorruptReplicasMap
* @param dn DatanodeDescriptor which holds the corrupt replica
* @param reason a textual reason (for logging purposes)
* @param reasonCode the enum representation of the reason
*/
public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
String reason, Reason reasonCode) {
Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
if (nodes == null) {
nodes = new TreeSet<DatanodeDescriptor>();
nodes = new HashMap<DatanodeDescriptor, Reason>();
corruptReplicasMap.put(blk, nodes);
}
@ -61,8 +84,7 @@ public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
reasonText = "";
}
if (!nodes.contains(dn)) {
nodes.add(dn);
if (!nodes.keySet().contains(dn)) {
NameNode.blockStateChangeLog.info("BLOCK NameSystem.addToCorruptReplicasMap: "+
blk.getBlockName() +
" added as corrupt on " + dn +
@ -76,6 +98,8 @@ public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
" by " + Server.getRemoteIp() +
reasonText);
}
// Add the node or update the reason.
nodes.put(dn, reasonCode);
}
/**
@ -97,10 +121,24 @@ void removeFromCorruptReplicasMap(Block blk) {
false if the replica is not in the map
*/
boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode) {
Collection<DatanodeDescriptor> datanodes = corruptReplicasMap.get(blk);
return removeFromCorruptReplicasMap(blk, datanode, Reason.ANY);
}
boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode,
Reason reason) {
Map <DatanodeDescriptor, Reason> datanodes = corruptReplicasMap.get(blk);
boolean removed = false;
if (datanodes==null)
return false;
if (datanodes.remove(datanode)) { // remove the replicas
// if reasons can be compared but don't match, return false.
Reason storedReason = datanodes.get(datanode);
if (reason != Reason.ANY && storedReason != null &&
reason != storedReason) {
return false;
}
if (datanodes.remove(datanode) != null) { // remove the replicas
if (datanodes.isEmpty()) {
// remove the block if there is no more corrupted replicas
corruptReplicasMap.remove(blk);
@ -118,7 +156,10 @@ boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode) {
* @return collection of nodes. Null if does not exists
*/
Collection<DatanodeDescriptor> getNodes(Block blk) {
return corruptReplicasMap.get(blk);
Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
if (nodes == null)
return null;
return nodes.keySet();
}
/**

View File

@ -5905,8 +5905,8 @@ private void updatePipelineInternal(String clientName, ExtendedBlock oldBlock,
}
// Update old block with the new generation stamp and new length
blockinfo.setGenerationStamp(newBlock.getGenerationStamp());
blockinfo.setNumBytes(newBlock.getNumBytes());
blockinfo.setGenerationStampAndVerifyReplicas(newBlock.getGenerationStamp());
// find the DatanodeDescriptor objects
final DatanodeManager dm = getBlockManager().getDatanodeManager();

View File

@ -26,10 +26,14 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.IOUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
/**
* This tests pipeline recovery related client protocol works correct or not.
*/
@ -112,4 +116,55 @@ public class TestClientProtocolForPipelineRecovery {
cluster.shutdown();
}
}
/** Test whether corrupt replicas are detected correctly during pipeline
* recoveries.
*/
@Test
public void testPipelineRecoveryForLastBlock() throws IOException {
DFSClientFaultInjector faultInjector
= Mockito.mock(DFSClientFaultInjector.class);
DFSClientFaultInjector oldInjector = DFSClientFaultInjector.instance;
DFSClientFaultInjector.instance = faultInjector;
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 3);
MiniDFSCluster cluster = null;
try {
int numDataNodes = 3;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
cluster.waitActive();
FileSystem fileSys = cluster.getFileSystem();
Path file = new Path("dataprotocol1.dat");
Mockito.when(faultInjector.failPacket()).thenReturn(true);
try {
DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
} catch (IOException e) {
// completeFile() should fail.
Assert.assertTrue(e.getMessage().startsWith("Unable to close file"));
return;
}
// At this point, NN let data corruption to happen.
// Before failing test, try reading the file. It should fail.
FSDataInputStream in = fileSys.open(file);
try {
int c = in.read();
// Test will fail with BlockMissingException if NN does not update the
// replica state based on the latest report.
} catch (org.apache.hadoop.hdfs.BlockMissingException bme) {
Assert.fail("Block is missing because the file was closed with"
+ " corrupt replicas.");
}
Assert.fail("The file was closed with corrupt replicas, but read still"
+ " works!");
} finally {
DFSClientFaultInjector.instance = oldInjector;
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -93,6 +93,10 @@ public void testCorruptFilesJsp() throws Exception {
in.close();
}
try {
Thread.sleep(3000); // Wait for block reports. They shouldn't matter.
} catch (InterruptedException ie) {}
// verify if all corrupt files were reported to NN
badFiles = namenode.getNamesystem().listCorruptFileBlocks("/", null);
assertTrue("Expecting 3 corrupt files, but got " + badFiles.size(),