diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 8e64602b42..d2a0d1115e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -369,6 +369,9 @@ Release 0.23.1 - UNRELEASED HDFS-2840. TestHostnameFilter should work with localhost or localhost.localdomain (tucu) + HDFS-2791. If block report races with closing of file, replica is + incorrectly marked corrupt. (todd) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 99a7f339c8..1630c6f492 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1537,7 +1537,24 @@ private boolean isReplicaCorrupt(Block iblk, ReplicaState reportedState, } case RBW: case RWR: - return storedBlock.isComplete(); + if (!storedBlock.isComplete()) { + return false; + } else if (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()) { + return true; + } else { // COMPLETE block, same genstamp + if (reportedState == ReplicaState.RBW) { + // If it's a RBW report for a COMPLETE block, it may just be that + // the block report got a little bit delayed after the pipeline + // closed. So, ignore this report, assuming we will get a + // FINALIZED replica later. See HDFS-2791 + LOG.info("Received an RBW replica for block " + storedBlock + + " on " + dn.getName() + ": ignoring it, since the block is " + + "complete with the same generation stamp."); + return false; + } else { + return true; + } + } case RUR: // should not be reported case TEMPORARY: // should not be reported default: diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index fd9d4c0321..ab8c8def84 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -782,4 +782,13 @@ private void startDistributedUpgradeIfNeeded() throws IOException { return; } -} \ No newline at end of file + @VisibleForTesting + DatanodeProtocolClientSideTranslatorPB getBpNamenode() { + return bpNamenode; + } + + @VisibleForTesting + void setBpNamenode(DatanodeProtocolClientSideTranslatorPB bpNamenode) { + this.bpNamenode = bpNamenode; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java index 384cfe75b9..50a34a8a04 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java @@ -101,7 +101,7 @@ public static FileSystem createHdfsWithDifferentUsername(final Configuration con return DFSTestUtil.getFileSystemAs(ugi, conf); } - static void write(OutputStream out, int offset, int length) throws IOException { + public static void write(OutputStream out, int offset, int length) throws IOException { final byte[] bytes = new byte[length]; for(int i = 0; i < length; i++) { bytes[i] = (byte)(offset + i); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java index 8595c94d34..fb015a2f73 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeAdapter.java @@ -17,6 +17,14 @@ */ package org.apache.hadoop.hdfs.server.datanode; +import java.io.IOException; + +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.mockito.Mockito; + +import com.google.common.base.Preconditions; + /** * WARNING!! This is TEST ONLY class: it never has to be used * for ANY development purposes. @@ -42,4 +50,29 @@ public static void setHeartbeatsDisabledForTests(DataNode dn, boolean heartbeatsDisabledForTests) { dn.setHeartbeatsDisabledForTests(heartbeatsDisabledForTests); } + + /** + * Insert a Mockito spy object between the given DataNode and + * the given NameNode. This can be used to delay or wait for + * RPC calls on the datanode->NN path. + */ + public static DatanodeProtocolClientSideTranslatorPB spyOnBposToNN( + DataNode dn, NameNode nn) { + String bpid = nn.getNamesystem().getBlockPoolId(); + + BPOfferService bpos = null; + for (BPOfferService thisBpos : dn.getAllBpOs()) { + if (thisBpos.getBlockPoolId().equals(bpid)) { + bpos = thisBpos; + break; + } + } + Preconditions.checkArgument(bpos != null, + "No such bpid: %s", bpid); + + DatanodeProtocolClientSideTranslatorPB origNN = bpos.getBpNamenode(); + DatanodeProtocolClientSideTranslatorPB spy = Mockito.spy(origNN); + bpos.setBpNamenode(spy); + return spy; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java index 5fafc7788e..82e24e65c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java @@ -21,7 +21,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.AppendTestUtil; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -30,19 +32,24 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.GenericTestUtils.DelayAnswer; import org.apache.log4j.Level; import org.junit.After; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; import java.io.File; import java.io.FilenameFilter; @@ -50,6 +57,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.concurrent.CountDownLatch; /** * This test simulates a variety of situations when blocks are being @@ -491,6 +499,84 @@ public void blockReport_09() throws IOException { resetConfiguration(); // return the initial state of the configuration } } + + /** + * Test for the case where one of the DNs in the pipeline is in the + * process of doing a block report exactly when the block is closed. + * In this case, the block report becomes delayed until after the + * block is marked completed on the NN, and hence it reports an RBW + * replica for a COMPLETE block. Such a report should not be marked + * corrupt. + * This is a regression test for HDFS-2791. + */ + @Test + public void testOneReplicaRbwReportArrivesAfterBlockCompleted() throws Exception { + final CountDownLatch brFinished = new CountDownLatch(1); + DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG) { + @Override + protected Object passThrough(InvocationOnMock invocation) + throws Throwable { + try { + return super.passThrough(invocation); + } finally { + // inform the test that our block report went through. + brFinished.countDown(); + } + } + }; + + final String METHOD_NAME = GenericTestUtils.getMethodName(); + Path filePath = new Path("/" + METHOD_NAME + ".dat"); + + // Start a second DN for this test -- we're checking + // what happens when one of the DNs is slowed for some reason. + REPL_FACTOR = 2; + startDNandWait(null, false); + + NameNode nn = cluster.getNameNode(); + + FSDataOutputStream out = fs.create(filePath, REPL_FACTOR); + try { + AppendTestUtil.write(out, 0, 10); + out.hflush(); + + // Set up a spy so that we can delay the block report coming + // from this node. + DataNode dn = cluster.getDataNodes().get(0); + DatanodeProtocolClientSideTranslatorPB spy = + DataNodeAdapter.spyOnBposToNN(dn, nn); + + Mockito.doAnswer(delayer) + .when(spy).blockReport( + Mockito.anyObject(), + Mockito.anyString(), + Mockito.anyObject()); + + // Force a block report to be generated. The block report will have + // an RBW replica in it. Wait for the RPC to be sent, but block + // it before it gets to the NN. + dn.scheduleAllBlockReport(0); + delayer.waitForCall(); + + } finally { + IOUtils.closeStream(out); + } + + // Now that the stream is closed, the NN will have the block in COMPLETE + // state. + delayer.proceed(); + brFinished.await(); + + // Verify that no replicas are marked corrupt, and that the + // file is still readable. + BlockManagerTestUtil.updateState(nn.getNamesystem().getBlockManager()); + assertEquals(0, nn.getNamesystem().getCorruptReplicaBlocks()); + DFSTestUtil.readFile(fs, filePath); + + // Ensure that the file is readable even from the DN that we futzed with. + cluster.stopDataNode(1); + DFSTestUtil.readFile(fs, filePath); + } private void waitForTempReplica(Block bl, int DN_N1) throws IOException { final boolean tooLongWait = false;