diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SequentialNumber.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SequentialNumber.java
index 366e679e64..685e92d628 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SequentialNumber.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SequentialNumber.java
@@ -45,6 +45,19 @@ public void setCurrentValue(long value) {
currentValue.set(value);
}
+ public boolean setIfGreater(long value) {
+ while(true) {
+ long local = currentValue.get();
+ if(value <= local) {
+ return false; // swap failed
+ }
+ if(currentValue.compareAndSet(local, value)) {
+ return true; // swap successful
+ }
+ // keep trying
+ }
+ }
+
/** Increment and then return the next value. */
public long nextValue() {
return currentValue.incrementAndGet();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java
index 5eebe8e2e5..bec6ec8368 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java
@@ -46,6 +46,23 @@ public class BlockIdManager {
* The global generation stamp for this file system.
*/
private final GenerationStamp generationStamp = new GenerationStamp();
+ /**
+ * Most recent global generation stamp as seen on Active NameNode.
+ * Used by StandbyNode only.
+ * StandbyNode does not update its global {@link #generationStamp} during
+ * edits tailing. The global generation stamp on StandbyNode is updated
+ * - when the block with the next generation stamp is actually
+ * received
+ * - during fail-over it is bumped to the last value received from the
+ * Active NN through edits and stored as
+ * {@link #impendingGenerationStamp}
+ * The former helps to avoid a race condition with IBRs during edits tailing.
+ * The latter guarantees that generation stamps are never reused by new
+ * Active after fail-over.
+ * See HDFS-14941 for more details.
+ */
+ private final GenerationStamp impendingGenerationStamp
+ = new GenerationStamp();
/**
* The value of the generation stamp when the first switch to sequential
* block IDs was made. Blocks with generation stamps below this value
@@ -162,6 +179,35 @@ public void setGenerationStamp(long stamp) {
generationStamp.setCurrentValue(stamp);
}
+ /**
+ * Set the currently highest gen stamp from active. Used
+ * by Standby only.
+ * @param stamp new genstamp
+ */
+ public void setImpendingGenerationStamp(long stamp) {
+ impendingGenerationStamp.setIfGreater(stamp);
+ }
+
+ /**
+ * Set the current genstamp to the impending genstamp.
+ */
+ public void applyImpendingGenerationStamp() {
+ setGenerationStampIfGreater(impendingGenerationStamp.getCurrentValue());
+ }
+
+ @VisibleForTesting
+ public long getImpendingGenerationStamp() {
+ return impendingGenerationStamp.getCurrentValue();
+ }
+
+ /**
+ * Set genstamp only when the given one is higher.
+ * @param stamp
+ */
+ public void setGenerationStampIfGreater(long stamp) {
+ generationStamp.setIfGreater(stamp);
+ }
+
public long getGenerationStamp() {
return generationStamp.getCurrentValue();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 6eec1e7499..c35087a0ac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -4738,7 +4738,9 @@ public long getHighestPriorityECBlockCount(){
public BlockInfo addBlockCollection(BlockInfo block,
BlockCollection bc) {
- return blocksMap.addBlockCollection(block, bc);
+ BlockInfo blockInfo = blocksMap.addBlockCollection(block, bc);
+ blockIdManager.setGenerationStampIfGreater(block.getGenerationStamp());
+ return blockInfo;
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
index 9b0a64d75a..99a80b537c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@ -776,7 +776,7 @@ private static void persistNewBlock(
* @param targets target datanodes where replicas of the new block is placed
* @throws QuotaExceededException If addition of block exceeds space quota
*/
- private static void saveAllocatedBlock(FSNamesystem fsn, String src,
+ static void saveAllocatedBlock(FSNamesystem fsn, String src,
INodesInPath inodesInPath, Block newBlock, DatanodeStorageInfo[] targets,
BlockType blockType) throws IOException {
assert fsn.hasWriteLock();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index ab0b2facfc..a8eb0dd07d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -867,8 +867,10 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
}
case OP_SET_GENSTAMP_V2: {
SetGenstampV2Op setGenstampV2Op = (SetGenstampV2Op) op;
- blockManager.getBlockIdManager().setGenerationStamp(
- setGenstampV2Op.genStampV2);
+ // update the impending gen stamp, but not the actual genstamp,
+ // see HDFS-14941
+ blockManager.getBlockIdManager()
+ .setImpendingGenerationStamp(setGenstampV2Op.genStampV2);
break;
}
case OP_ALLOCATE_BLOCK_ID: {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
index 1482f2cd91..e278a33990 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
@@ -1836,7 +1836,15 @@ protected void toXml(ContentHandler contentHandler) throws SAXException {
}
}
- /** Similar with {@link SetGenstampV1Op} */
+ /**
+ * This operation does not actually update gen stamp immediately,
+ * the new gen stamp is recorded as impending gen stamp.
+ * The global generation stamp on Standby Node is updated when
+ * the block with the next generation stamp is actually received.
+ * We keep logging this operation for backward compatibility.
+ * The impending gen stamp will take effect when the standby
+ * transition to become an active.
+ */
static class SetGenstampV2Op extends FSEditLogOp {
long genStampV2;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 5209b98100..0bf2166545 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -1248,6 +1248,7 @@ void startActiveServices() throws IOException {
blockManager.getDatanodeManager().markAllDatanodesStale();
blockManager.clearQueues();
blockManager.processAllPendingDNMessages();
+ blockManager.getBlockIdManager().applyImpendingGenerationStamp();
// Only need to re-process the queue, If not in SafeMode.
if (!isInSafeMode()) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index ebd5faf502..f1b26abce6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.mockito.Mockito.spy;
@@ -31,12 +32,15 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockType;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
@@ -202,6 +206,47 @@ public static long[] getStats(final FSNamesystem fsn) {
return fsn.getStats();
}
+ public static long getGenerationStamp(final FSNamesystem fsn)
+ throws IOException {
+ return fsn.getBlockManager().getBlockIdManager().getGenerationStamp();
+ }
+
+ public static long getImpendingGenerationStamp(final FSNamesystem fsn) {
+ return fsn.getBlockManager().getBlockIdManager()
+ .getImpendingGenerationStamp();
+ }
+
+ public static BlockInfo addBlockNoJournal(final FSNamesystem fsn,
+ final String src, final DatanodeStorageInfo[] targets)
+ throws IOException {
+ fsn.writeLock();
+ try {
+ INodeFile file = (INodeFile)fsn.getFSDirectory().getINode(src);
+ Block newBlock = fsn.createNewBlock(BlockType.CONTIGUOUS);
+ INodesInPath inodesInPath = INodesInPath.fromINode(file);
+ FSDirWriteFileOp.saveAllocatedBlock(
+ fsn, src, inodesInPath, newBlock, targets, BlockType.CONTIGUOUS);
+ return file.getLastBlock();
+ } finally {
+ fsn.writeUnlock();
+ }
+ }
+
+ public static void persistBlocks(final FSNamesystem fsn,
+ final String src, final INodeFile file) throws IOException {
+ fsn.writeLock();
+ try {
+ FSDirWriteFileOp.persistBlocks(fsn.getFSDirectory(), src, file, true);
+ } finally {
+ fsn.writeUnlock();
+ }
+ }
+
+ public static BlockInfo getStoredBlock(final FSNamesystem fsn,
+ final Block b) {
+ return fsn.getStoredBlock(b);
+ }
+
public static FSNamesystem spyOnNamesystem(NameNode nn) {
FSNamesystem fsnSpy = Mockito.spy(nn.getNamesystem());
FSNamesystem fsnOld = nn.namesystem;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestAddBlockTailing.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestAddBlockTailing.java
new file mode 100644
index 0000000000..48c09eda79
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestAddBlockTailing.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+/**
+ * Tests the race condition that IBR and add block may result
+ * in inconsistent block genstamp.
+ */
+public class TestAddBlockTailing {
+ private static final int BLOCK_SIZE = 8192;
+ private static final String TEST_DIR = "/TestAddBlockTailing";
+
+ private static MiniQJMHACluster qjmhaCluster;
+ private static MiniDFSCluster dfsCluster;
+ private static DistributedFileSystem dfs;
+ private static FSNamesystem fsn0;
+ private static FSNamesystem fsn1;
+ private static DataNode dn0;
+
+ @BeforeClass
+ public static void startUpCluster() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
+ MiniQJMHACluster.Builder qjmBuilder = new MiniQJMHACluster.Builder(conf)
+ .setNumNameNodes(2);
+ qjmBuilder.getDfsBuilder().numDataNodes(1);
+ qjmhaCluster = qjmBuilder.build();
+ dfsCluster = qjmhaCluster.getDfsCluster();
+ dfsCluster.waitActive();
+ dfsCluster.transitionToActive(0);
+ dfs = dfsCluster.getFileSystem(0);
+ fsn0 = dfsCluster.getNameNode(0).getNamesystem();
+ fsn1 = dfsCluster.getNameNode(1).getNamesystem();
+ dfs.mkdirs(new Path(TEST_DIR), new FsPermission("755"));
+ dn0 = dfsCluster.getDataNodes().get(0);
+ }
+
+ @AfterClass
+ public static void shutDownCluster() throws IOException {
+ if (qjmhaCluster != null) {
+ qjmhaCluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testStandbyAddBlockIBRRace() throws Exception {
+ String testFile = TEST_DIR +"/testStandbyAddBlockIBRRace";
+
+ // initial global generation stamp check
+ assertEquals("Global Generation stamps on NNs should be the same",
+ NameNodeAdapter.getGenerationStamp(fsn0),
+ NameNodeAdapter.getGenerationStamp(fsn1));
+
+ // create a file, add a block on NN0
+ // do not journal addBlock yet
+ dfs.create(new Path(testFile), true, dfs.getConf()
+ .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
+ (short) 1, BLOCK_SIZE);
+ DatanodeManager dnManager = fsn0.getBlockManager().getDatanodeManager();
+ DatanodeStorageInfo[] targets =
+ dnManager.getDatanode(dn0.getDatanodeId()).getStorageInfos();
+ targets = new DatanodeStorageInfo[] {targets[0]};
+ BlockInfo newBlock = NameNodeAdapter.addBlockNoJournal(
+ fsn0, testFile, targets);
+
+ // NN1 tails increment generation stamp transaction
+ fsn0.getEditLog().logSync();
+ fsn1.getEditLogTailer().doTailEdits();
+
+ assertEquals("Global Generation stamps on NN0 and "
+ + "impending on NN1 should be equal",
+ NameNodeAdapter.getGenerationStamp(fsn0),
+ NameNodeAdapter.getImpendingGenerationStamp(fsn1));
+
+ // NN1 processes IBR with the replica
+ StorageReceivedDeletedBlocks[] report = DFSTestUtil
+ .makeReportForReceivedBlock(newBlock,
+ ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
+ dn0.getFSDataset().getStorage(targets[0].getStorageID()));
+ fsn1.processIncrementalBlockReport(dn0.getDatanodeId(), report[0]);
+
+ // NN0 persists the block, i.e adds update block transaction
+ INodeFile file = (INodeFile)fsn0.getFSDirectory().getINode(testFile);
+ NameNodeAdapter.persistBlocks(fsn0, testFile, file);
+
+ // NN1 tails update block transaction
+ fsn0.getEditLog().logSync();
+ fsn1.getEditLogTailer().doTailEdits();
+
+ assertEquals("Global Generation stamps on NN0 and "
+ + "impending on NN1 should be equal",
+ NameNodeAdapter.getGenerationStamp(fsn0),
+ NameNodeAdapter.getImpendingGenerationStamp(fsn1));
+
+ // The new block on NN1 should have the replica
+ BlockInfo newBlock1 = NameNodeAdapter.getStoredBlock(fsn1, newBlock);
+ assertTrue("New block on NN1 should contain the replica",
+ newBlock1.getStorageInfos().hasNext());
+ assertEquals("Generation stamps of the block on NNs should be the same",
+ newBlock.getGenerationStamp(), newBlock1.getGenerationStamp());
+ assertEquals("Global Generation stamps on NNs should be the same",
+ NameNodeAdapter.getGenerationStamp(fsn0),
+ NameNodeAdapter.getGenerationStamp(fsn1));
+
+ // Check that the generation stamp restores on Standby after failover
+ ClientProtocol rpc0 = dfsCluster.getNameNode(0).getRpcServer();
+ ClientProtocol rpc1 = dfsCluster.getNameNode(1).getRpcServer();
+ LocatedBlock lb = rpc0.getBlockLocations(testFile, 0, 0).get(0);
+ rpc0.updateBlockForPipeline(lb.getBlock(), dfs.getClient().getClientName());
+ long gs0 = NameNodeAdapter.getGenerationStamp(fsn0);
+ dfsCluster.transitionToStandby(0);
+ dfsCluster.transitionToActive(1);
+ assertEquals("Global Generation stamps on new active should be "
+ + "the same as on the old one", gs0,
+ NameNodeAdapter.getGenerationStamp(fsn1));
+
+ rpc1.delete(testFile, false);
+ }
+}