diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 260d7a21ab..953e48a932 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -1808,7 +1808,7 @@ BatchedEntries listOpenFiles(long prevId, * @throws IOException */ @Idempotent - @ReadOnly(isCoordinated = true) + @ReadOnly(activeOnly = true) void msync() throws IOException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java index fe5345daff..2845670d44 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -150,7 +151,51 @@ public void testMsyncSimple() throws Exception { assertEquals(1, readStatus.get()); } - // @Ignore("Move to another test file") + @Test + public void testMsync() throws Exception { + // 0 == not completed, 1 == succeeded, -1 == failed + AtomicInteger readStatus = new AtomicInteger(0); + Configuration conf2 = new Configuration(conf); + + // Disable FS cache so two different DFS clients will be used. + conf2.setBoolean("fs.hdfs.impl.disable.cache", true); + DistributedFileSystem dfs2 = (DistributedFileSystem) FileSystem.get(conf2); + + // Initialize the proxies for Observer Node. + dfs.getClient().getHAServiceState(); + dfs2.getClient().getHAServiceState(); + + // Advance Observer's state ID so it is ahead of client's. + dfs.mkdir(new Path("/test"), FsPermission.getDefault()); + dfsCluster.rollEditLogAndTail(0); + + dfs.mkdir(testPath, FsPermission.getDefault()); + assertSentTo(0); + + Thread reader = new Thread(() -> { + try { + // After msync, client should have the latest state ID from active. + // Therefore, the subsequent getFileStatus call should succeed. + dfs2.getClient().msync(); + dfs2.getFileStatus(testPath); + readStatus.set(1); + } catch (IOException e) { + e.printStackTrace(); + readStatus.set(-1); + } + }); + + reader.start(); + + Thread.sleep(100); + assertEquals(0, readStatus.get()); + + dfsCluster.rollEditLogAndTail(0); + + GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000); + assertEquals(1, readStatus.get()); + } + @Test public void testUncoordinatedCall() throws Exception { // make a write call so that client will be ahead of