diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 667cac9d0a..2eef17a505 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -560,6 +560,8 @@ Release 2.3.0 - UNRELEASED HDFS-5504. In HA mode, OP_DELETE_SNAPSHOT is not decrementing the safemode threshold, leads to NN safemode. (Vinay via jing9) + HDFS-5438. Flaws in block report processing can cause data loss. (kihwal) + Release 2.2.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java index 1738a4c1bc..c5c6d5c549 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java @@ -42,4 +42,8 @@ public boolean corruptPacket() { public boolean uncorruptPacket() { return false; } + + public boolean failPacket() { + return false; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index a1f3258d3a..362ffe6f42 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -151,6 +151,7 @@ public class DFSOutputStream extends FSOutputSummer private final short blockReplication; // replication factor of file private boolean shouldSyncBlock = false; // force blocks to disk upon close private CachingStrategy cachingStrategy; + private boolean failPacket = false; private static class Packet { private static final long HEART_BEAT_SEQNO = -1L; @@ -743,6 +744,16 @@ public void run() { one.seqno + " but received " + seqno); } isLastPacketInBlock = one.lastPacketInBlock; + + // Fail the packet write for testing in order to force a + // pipeline recovery. + if (DFSClientFaultInjector.get().failPacket() && + isLastPacketInBlock) { + failPacket = true; + throw new IOException( + "Failing the last packet for testing."); + } + // update bytesAcked block.setNumBytes(one.getLastByteOffsetBlock()); @@ -1027,7 +1038,18 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException { accessToken = lb.getBlockToken(); // set up the pipeline again with the remaining nodes - success = createBlockOutputStream(nodes, newGS, isRecovery); + if (failPacket) { // for testing + success = createBlockOutputStream(nodes, newGS-1, isRecovery); + failPacket = false; + try { + // Give DNs time to send in bad reports. In real situations, + // good reports should follow bad ones, if client committed + // with those nodes. + Thread.sleep(2000); + } catch (InterruptedException ie) {} + } else { + success = createBlockOutputStream(nodes, newGS, isRecovery); + } } if (success) { @@ -1886,7 +1908,9 @@ public synchronized void close() throws IOException { // be called during unit tests private void completeFile(ExtendedBlock last) throws IOException { long localstart = Time.now(); + long localTimeout = 400; boolean fileComplete = false; + int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; while (!fileComplete) { fileComplete = dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId); @@ -1902,7 +1926,13 @@ private void completeFile(ExtendedBlock last) throws IOException { throw new IOException(msg); } try { - Thread.sleep(400); + Thread.sleep(localTimeout); + if (retries == 0) { + throw new IOException("Unable to close file because the last block" + + " does not have enough number of replicas."); + } + retries--; + localTimeout *= 2; if (Time.now() - localstart > 5000) { DFSClient.LOG.info("Could not complete " + src + " retrying..."); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java index 243f753024..b05cfce1e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java @@ -229,6 +229,29 @@ public long getBlockRecoveryId() { return blockRecoveryId; } + /** + * Process the recorded replicas. When about to commit or finish the + * pipeline recovery sort out bad replicas. + * @param genStamp The final generation stamp for the block. + */ + public void setGenerationStampAndVerifyReplicas(long genStamp) { + if (replicas == null) + return; + + // Remove the replicas with wrong gen stamp. + // The replica list is unchanged. + for (ReplicaUnderConstruction r : replicas) { + if (genStamp != r.getGenerationStamp()) { + r.getExpectedLocation().removeBlock(this); + NameNode.blockStateChangeLog.info("BLOCK* Removing stale replica " + + "from location: " + r); + } + } + + // Set the generation stamp for the block. + setGenerationStamp(genStamp); + } + /** * Commit block's length and generation stamp as reported by the client. * Set block state to {@link BlockUCState#COMMITTED}. @@ -295,9 +318,13 @@ public void initializeBlockRecovery(long recoveryId) { void addReplicaIfNotPresent(DatanodeDescriptor dn, Block block, ReplicaState rState) { - for(ReplicaUnderConstruction r : replicas) - if(r.getExpectedLocation() == dn) + for (ReplicaUnderConstruction r : replicas) { + if (r.getExpectedLocation() == dn) { + // Record the gen stamp from the report + r.setGenerationStamp(block.getGenerationStamp()); return; + } + } replicas.add(new ReplicaUnderConstruction(block, dn, rState)); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 35045548c1..7091e0d14a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; +import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason; import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; @@ -1046,7 +1047,8 @@ public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk, + blk + " not found"); return; } - markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason), dn); + markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason, + Reason.CORRUPTION_REPORTED), dn); } private void markBlockAsCorrupt(BlockToMarkCorrupt b, @@ -1069,7 +1071,8 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b, node.addBlock(b.stored); // Add this replica to corruptReplicas Map - corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason); + corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason, + b.reasonCode); if (countNodes(b.stored).liveReplicas() >= bc.getBlockReplication()) { // the block is over-replicated so invalidate the replicas immediately invalidateBlock(b, node); @@ -1570,22 +1573,27 @@ private static class BlockToMarkCorrupt { final BlockInfo stored; /** The reason to mark corrupt. */ final String reason; - - BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason) { + /** The reason code to be stored */ + final Reason reasonCode; + + BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason, + Reason reasonCode) { Preconditions.checkNotNull(corrupted, "corrupted is null"); Preconditions.checkNotNull(stored, "stored is null"); this.corrupted = corrupted; this.stored = stored; this.reason = reason; + this.reasonCode = reasonCode; } - BlockToMarkCorrupt(BlockInfo stored, String reason) { - this(stored, stored, reason); + BlockToMarkCorrupt(BlockInfo stored, String reason, Reason reasonCode) { + this(stored, stored, reason, reasonCode); } - BlockToMarkCorrupt(BlockInfo stored, long gs, String reason) { - this(new BlockInfo(stored), stored, reason); + BlockToMarkCorrupt(BlockInfo stored, long gs, String reason, + Reason reasonCode) { + this(new BlockInfo(stored), stored, reason, reasonCode); //the corrupted block in datanode has a different generation stamp corrupted.setGenerationStamp(gs); } @@ -1930,9 +1938,11 @@ private BlockInfo processReportedBlock(final DatanodeDescriptor dn, return storedBlock; } - //add replica if appropriate + // Add replica if appropriate. If the replica was previously corrupt + // but now okay, it might need to be updated. if (reportedState == ReplicaState.FINALIZED - && storedBlock.findDatanode(dn) < 0) { + && (storedBlock.findDatanode(dn) < 0 + || corruptReplicas.isReplicaCorrupt(storedBlock, dn))) { toAdd.add(storedBlock); } return storedBlock; @@ -2023,12 +2033,13 @@ private BlockToMarkCorrupt checkReplicaCorrupt( return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is " + ucState + " and reported genstamp " + reportedGS + " does not match genstamp in block map " - + storedBlock.getGenerationStamp()); + + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); } else if (storedBlock.getNumBytes() != reported.getNumBytes()) { return new BlockToMarkCorrupt(storedBlock, "block is " + ucState + " and reported length " + reported.getNumBytes() + " does not match " + - "length in block map " + storedBlock.getNumBytes()); + "length in block map " + storedBlock.getNumBytes(), + Reason.SIZE_MISMATCH); } else { return null; // not corrupt } @@ -2044,7 +2055,7 @@ private BlockToMarkCorrupt checkReplicaCorrupt( return new BlockToMarkCorrupt(storedBlock, reportedGS, "reported " + reportedState + " replica with genstamp " + reportedGS + " does not match COMPLETE block's genstamp in block map " - + storedBlock.getGenerationStamp()); + + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); } else { // COMPLETE block, same genstamp if (reportedState == ReplicaState.RBW) { // If it's a RBW report for a COMPLETE block, it may just be that @@ -2057,7 +2068,8 @@ private BlockToMarkCorrupt checkReplicaCorrupt( return null; } else { return new BlockToMarkCorrupt(storedBlock, - "reported replica has invalid state " + reportedState); + "reported replica has invalid state " + reportedState, + Reason.INVALID_STATE); } } case RUR: // should not be reported @@ -2068,7 +2080,7 @@ private BlockToMarkCorrupt checkReplicaCorrupt( " on " + dn + " size " + storedBlock.getNumBytes(); // log here at WARN level since this is really a broken HDFS invariant LOG.warn(msg); - return new BlockToMarkCorrupt(storedBlock, msg); + return new BlockToMarkCorrupt(storedBlock, msg, Reason.INVALID_STATE); } } @@ -2184,6 +2196,11 @@ private Block addStoredBlock(final BlockInfo block, logAddStoredBlock(storedBlock, node); } } else { + // if the same block is added again and the replica was corrupt + // previously because of a wrong gen stamp, remove it from the + // corrupt block list. + corruptReplicas.removeFromCorruptReplicasMap(block, node, + Reason.GENSTAMP_MISMATCH); curReplicaDelta = 0; blockLog.warn("BLOCK* addStoredBlock: " + "Redundant addStoredBlock request received for " + storedBlock @@ -2280,7 +2297,8 @@ private void invalidateCorruptReplicas(BlockInfo blk) { DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]); for (DatanodeDescriptor node : nodesCopy) { try { - if (!invalidateBlock(new BlockToMarkCorrupt(blk, null), node)) { + if (!invalidateBlock(new BlockToMarkCorrupt(blk, null, + Reason.ANY), node)) { removedFromBlocksMap = false; } } catch (IOException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java index 4613199ee6..cb9f79ab44 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java @@ -36,8 +36,18 @@ @InterfaceAudience.Private public class CorruptReplicasMap{ - private SortedMap> corruptReplicasMap = - new TreeMap>(); + /** The corruption reason code */ + public static enum Reason { + NONE, // not specified. + ANY, // wildcard reason + GENSTAMP_MISMATCH, // mismatch in generation stamps + SIZE_MISMATCH, // mismatch in sizes + INVALID_STATE, // invalid state + CORRUPTION_REPORTED // client or datanode reported the corruption + } + + private SortedMap> corruptReplicasMap = + new TreeMap>(); /** * Mark the block belonging to datanode as corrupt. @@ -48,9 +58,22 @@ public class CorruptReplicasMap{ */ public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn, String reason) { - Collection nodes = getNodes(blk); + addToCorruptReplicasMap(blk, dn, reason, Reason.NONE); + } + + /** + * Mark the block belonging to datanode as corrupt. + * + * @param blk Block to be added to CorruptReplicasMap + * @param dn DatanodeDescriptor which holds the corrupt replica + * @param reason a textual reason (for logging purposes) + * @param reasonCode the enum representation of the reason + */ + public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn, + String reason, Reason reasonCode) { + Map nodes = corruptReplicasMap.get(blk); if (nodes == null) { - nodes = new TreeSet(); + nodes = new HashMap(); corruptReplicasMap.put(blk, nodes); } @@ -61,8 +84,7 @@ public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn, reasonText = ""; } - if (!nodes.contains(dn)) { - nodes.add(dn); + if (!nodes.keySet().contains(dn)) { NameNode.blockStateChangeLog.info("BLOCK NameSystem.addToCorruptReplicasMap: "+ blk.getBlockName() + " added as corrupt on " + dn + @@ -76,6 +98,8 @@ public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn, " by " + Server.getRemoteIp() + reasonText); } + // Add the node or update the reason. + nodes.put(dn, reasonCode); } /** @@ -97,10 +121,24 @@ void removeFromCorruptReplicasMap(Block blk) { false if the replica is not in the map */ boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode) { - Collection datanodes = corruptReplicasMap.get(blk); + return removeFromCorruptReplicasMap(blk, datanode, Reason.ANY); + } + + boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode, + Reason reason) { + Map datanodes = corruptReplicasMap.get(blk); + boolean removed = false; if (datanodes==null) return false; - if (datanodes.remove(datanode)) { // remove the replicas + + // if reasons can be compared but don't match, return false. + Reason storedReason = datanodes.get(datanode); + if (reason != Reason.ANY && storedReason != null && + reason != storedReason) { + return false; + } + + if (datanodes.remove(datanode) != null) { // remove the replicas if (datanodes.isEmpty()) { // remove the block if there is no more corrupted replicas corruptReplicasMap.remove(blk); @@ -118,7 +156,10 @@ boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode) { * @return collection of nodes. Null if does not exists */ Collection getNodes(Block blk) { - return corruptReplicasMap.get(blk); + Map nodes = corruptReplicasMap.get(blk); + if (nodes == null) + return null; + return nodes.keySet(); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 8be6d274af..1fc1624c7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -5905,8 +5905,8 @@ private void updatePipelineInternal(String clientName, ExtendedBlock oldBlock, } // Update old block with the new generation stamp and new length - blockinfo.setGenerationStamp(newBlock.getGenerationStamp()); blockinfo.setNumBytes(newBlock.getNumBytes()); + blockinfo.setGenerationStampAndVerifyReplicas(newBlock.getGenerationStamp()); // find the DatanodeDescriptor objects final DatanodeManager dm = getBlockManager().getDatanodeManager(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java index ee39cfe533..0a8ed3eb7f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java @@ -26,10 +26,14 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.io.IOUtils; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; + /** * This tests pipeline recovery related client protocol works correct or not. */ @@ -112,4 +116,55 @@ public class TestClientProtocolForPipelineRecovery { cluster.shutdown(); } } + + /** Test whether corrupt replicas are detected correctly during pipeline + * recoveries. + */ + @Test + public void testPipelineRecoveryForLastBlock() throws IOException { + DFSClientFaultInjector faultInjector + = Mockito.mock(DFSClientFaultInjector.class); + DFSClientFaultInjector oldInjector = DFSClientFaultInjector.instance; + DFSClientFaultInjector.instance = faultInjector; + Configuration conf = new HdfsConfiguration(); + + conf.setInt(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 3); + MiniDFSCluster cluster = null; + + try { + int numDataNodes = 3; + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build(); + cluster.waitActive(); + FileSystem fileSys = cluster.getFileSystem(); + + Path file = new Path("dataprotocol1.dat"); + Mockito.when(faultInjector.failPacket()).thenReturn(true); + try { + DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L); + } catch (IOException e) { + // completeFile() should fail. + Assert.assertTrue(e.getMessage().startsWith("Unable to close file")); + return; + } + + // At this point, NN let data corruption to happen. + // Before failing test, try reading the file. It should fail. + FSDataInputStream in = fileSys.open(file); + try { + int c = in.read(); + // Test will fail with BlockMissingException if NN does not update the + // replica state based on the latest report. + } catch (org.apache.hadoop.hdfs.BlockMissingException bme) { + Assert.fail("Block is missing because the file was closed with" + + " corrupt replicas."); + } + Assert.fail("The file was closed with corrupt replicas, but read still" + + " works!"); + } finally { + DFSClientFaultInjector.instance = oldInjector; + if (cluster != null) { + cluster.shutdown(); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java index d399ddf856..181161430d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java @@ -93,6 +93,10 @@ public void testCorruptFilesJsp() throws Exception { in.close(); } + try { + Thread.sleep(3000); // Wait for block reports. They shouldn't matter. + } catch (InterruptedException ie) {} + // verify if all corrupt files were reported to NN badFiles = namenode.getNamesystem().listCorruptFileBlocks("/", null); assertTrue("Expecting 3 corrupt files, but got " + badFiles.size(),