diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index c7046708ba..aa00d05bad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -2968,12 +2968,12 @@ LocatedBlock getAdditionalBlock( LocatedBlock[] onRetryBlock = new LocatedBlock[1]; FSDirWriteFileOp.ValidateAddBlockResult r; - checkOperation(OperationCategory.READ); + checkOperation(OperationCategory.WRITE); final FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker.setOperationType(operationName); readLock(); try { - checkOperation(OperationCategory.READ); + checkOperation(OperationCategory.WRITE); r = FSDirWriteFileOp.validateAddBlock(this, pc, src, fileId, clientName, previous, onRetryBlock); } finally { @@ -3019,12 +3019,15 @@ LocatedBlock getAdditionalDatanode(String src, long fileId, final byte storagePolicyID; final List chosen; final BlockType blockType; - checkOperation(OperationCategory.READ); + checkOperation(OperationCategory.WRITE); final FSPermissionChecker pc = getPermissionChecker(); FSPermissionChecker.setOperationType(null); readLock(); try { - checkOperation(OperationCategory.READ); + // Changing this operation category to WRITE instead of making getAdditionalDatanode as a + // read method is aim to let Active NameNode to handle this RPC, because Active NameNode + // contains a more complete DN selection context than Observer NameNode. + checkOperation(OperationCategory.WRITE); //check safe mode checkNameNodeSafeMode("Cannot add datanode; src=" + src + ", blk=" + blk); final INodesInPath iip = dir.resolvePath(pc, src, fileId); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRpcServer.java index 74d85bc637..2b6c2a1890 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRpcServer.java @@ -28,22 +28,32 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertNotNull; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.EnumSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.ipc.CallerContext; +import org.apache.hadoop.ipc.ObserverRetryOnActiveException; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; import org.junit.Test; +import org.junit.jupiter.api.Timeout; public class TestNameNodeRpcServer { @@ -91,6 +101,43 @@ private static String getPreferredLocation(DistributedFileSystem fs, // trials. 1/3^20=3e-10, so that should be good enough. static final int ITERATIONS_TO_USE = 20; + @Test + @Timeout(30000) + public void testObserverHandleAddBlock() throws Exception { + String baseDir = GenericTestUtils.getRandomizedTempPath(); + Configuration conf = new HdfsConfiguration(); + MiniQJMHACluster.Builder builder = new MiniQJMHACluster.Builder(conf).setNumNameNodes(3); + builder.getDfsBuilder().numDataNodes(3); + try (MiniQJMHACluster qjmhaCluster = builder.baseDir(baseDir).build()) { + MiniDFSCluster dfsCluster = qjmhaCluster.getDfsCluster(); + dfsCluster.waitActive(); + dfsCluster.transitionToActive(0); + dfsCluster.transitionToObserver(2); + + NameNode activeNN = dfsCluster.getNameNode(0); + NameNode observerNN = dfsCluster.getNameNode(2); + + // Stop the editLogTailer of Observer NameNode + observerNN.getNamesystem().getEditLogTailer().stop(); + DistributedFileSystem dfs = dfsCluster.getFileSystem(0); + + Path testPath = new Path("/testObserverHandleAddBlock/file.txt"); + try (FSDataOutputStream ignore = dfs.create(testPath)) { + HdfsFileStatus fileStatus = activeNN.getRpcServer().getFileInfo(testPath.toUri().getPath()); + assertNotNull(fileStatus); + assertNull(observerNN.getRpcServer().getFileInfo(testPath.toUri().getPath())); + + LambdaTestUtils.intercept(ObserverRetryOnActiveException.class, () -> { + observerNN.getRpcServer().addBlock(testPath.toUri().getPath(), + dfs.getClient().getClientName(), null, null, + fileStatus.getFileId(), null, EnumSet.noneOf(AddBlockFlag.class)); + }); + } finally { + dfs.delete(testPath, true); + } + } + } + /** * A test to make sure that if an authorized user adds "clientIp:" to their * caller context, it will be used to make locality decisions on the NN.