diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index ccffcd0c70..a26902f5de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -3044,12 +3044,12 @@ LocatedBlock getAdditionalBlock( LocatedBlock[] onRetryBlock = new LocatedBlock[1]; FSDirWriteFileOp.ValidateAddBlockResult r; - checkOperation(OperationCategory.READ); + checkOperation(OperationCategory.WRITE); final FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker.setOperationType(operationName); readLock(); try { - checkOperation(OperationCategory.READ); + checkOperation(OperationCategory.WRITE); r = FSDirWriteFileOp.validateAddBlock(this, pc, src, fileId, clientName, previous, onRetryBlock); } finally { @@ -3095,12 +3095,15 @@ LocatedBlock getAdditionalDatanode(String src, long fileId, final byte storagePolicyID; final List chosen; final BlockType blockType; - checkOperation(OperationCategory.READ); + checkOperation(OperationCategory.WRITE); final FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker.setOperationType(null); readLock(); try { - checkOperation(OperationCategory.READ); + // Changing this operation category to WRITE instead of making getAdditionalDatanode as a + // read method is aim to let Active NameNode to handle this RPC, because Active NameNode + // contains a more complete DN selection context than Observer NameNode. + checkOperation(OperationCategory.WRITE); //check safe mode checkNameNodeSafeMode("Cannot add datanode; src=" + src + ", blk=" + blk); final INodesInPath iip = dir.resolvePath(pc, src, fileId); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRpcServer.java index 2960a7ee6d..d29e11cffe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRpcServer.java @@ -29,26 +29,32 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.security.PrivilegedExceptionAction; +import java.util.EnumSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.ipc.CallerContext; +import org.apache.hadoop.ipc.ObserverRetryOnActiveException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; import org.junit.Test; import org.junit.jupiter.api.Timeout; @@ -158,6 +164,43 @@ public void testNamenodeRpcClientIpProxyWithFailBack() throws Exception { } } + @Test + @Timeout(30000) + public void testObserverHandleAddBlock() throws Exception { + String baseDir = GenericTestUtils.getRandomizedTempPath(); + Configuration conf = new HdfsConfiguration(); + MiniQJMHACluster.Builder builder = new MiniQJMHACluster.Builder(conf).setNumNameNodes(3); + builder.getDfsBuilder().numDataNodes(3); + try (MiniQJMHACluster qjmhaCluster = builder.baseDir(baseDir).build()) { + MiniDFSCluster dfsCluster = qjmhaCluster.getDfsCluster(); + dfsCluster.waitActive(); + dfsCluster.transitionToActive(0); + dfsCluster.transitionToObserver(2); + + NameNode activeNN = dfsCluster.getNameNode(0); + NameNode observerNN = dfsCluster.getNameNode(2); + + // Stop the editLogTailer of Observer NameNode + observerNN.getNamesystem().getEditLogTailer().stop(); + DistributedFileSystem dfs = dfsCluster.getFileSystem(0); + + Path testPath = new Path("/testObserverHandleAddBlock/file.txt"); + try (FSDataOutputStream ignore = dfs.create(testPath)) { + HdfsFileStatus fileStatus = activeNN.getRpcServer().getFileInfo(testPath.toUri().getPath()); + assertNotNull(fileStatus); + assertNull(observerNN.getRpcServer().getFileInfo(testPath.toUri().getPath())); + + LambdaTestUtils.intercept(ObserverRetryOnActiveException.class, () -> { + observerNN.getRpcServer().addBlock(testPath.toUri().getPath(), + dfs.getClient().getClientName(), null, null, + fileStatus.getFileId(), null, EnumSet.noneOf(AddBlockFlag.class)); + }); + } finally { + dfs.delete(testPath, true); + } + } + } + /** * A test to make sure that if an authorized user adds "clientIp:" to their * caller context, it will be used to make locality decisions on the NN.