From 833fc64558560b73346699c8a5d868f1a9a29571 Mon Sep 17 00:00:00 2001 From: Steve Vaughan Date: Sun, 28 Aug 2022 14:15:04 -0400 Subject: [PATCH] HDFS-16684. Exclude the current JournalNode (#4786) The JournalNodeSyncer will include the local instance in syncing when using a bind host (e.g. 0.0.0.0). There is a mechanism that is supposed to exclude the local instance, but it doesn't recognize the meta-address as a local address. Running with bind addresses set to 0.0.0.0, the JournalNodeSyncer will log attempts to sync with itself as part of the normal syncing rotation. For an HA configuration running 3 JournalNodes, the "other" list used by the JournalNodeSyncer will include 3 proxies. Exclude bound local addresses, including the use of a wildcard address in the bound host configurations, while still allowing multiple instances on the same host. Allow sync attempts with unresolved addresses, so that sync attempts can drive resolution as servers become available. Backport. Signed-off-by: stack --- .../hdfs/qjournal/server/JournalNode.java | 5 +++ .../qjournal/server/JournalNodeSyncer.java | 23 ++++++++-- .../qjournal/server/TestJournalNodeSync.java | 44 +++++++++++++++++-- 3 files changed, 65 insertions(+), 7 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java index 4ba880bc9c..0b5932d70f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java @@ -122,6 +122,11 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean { return journal; } + @VisibleForTesting + public JournalNodeSyncer getJournalSyncer(String jid) { + return journalSyncersById.get(jid); + } + @VisibleForTesting public boolean getJournalSyncerStatus(String jid) { if (journalSyncersById.get(jid) != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java index 08c21c65c0..6e861e62aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.qjournal.server; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; import org.apache.hadoop.thirdparty.com.google.common.collect.Sets; @@ -54,6 +55,7 @@ import java.security.PrivilegedExceptionAction; import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.Set; /** * A Journal Sync thread runs through the lifetime of the JN. It periodically @@ -153,6 +155,9 @@ public class JournalNodeSyncer { LOG.warn("Could not add proxy for Journal at addresss " + addr, e); } } + // Check if there are any other JournalNodes before starting the sync. Although some proxies + // may be unresolved now, the act of attempting to sync will instigate resolution when the + // servers become available. if (otherJNProxies.isEmpty()) { LOG.error("Cannot sync as there is no other JN available for sync."); return false; @@ -310,12 +315,24 @@ public class JournalNodeSyncer { return null; } - private List getJournalAddrList(String uriStr) throws + @VisibleForTesting + protected List getJournalAddrList(String uriStr) throws URISyntaxException, IOException { URI uri = new URI(uriStr); - return Util.getLoggerAddresses(uri, - Sets.newHashSet(jn.getBoundIpcAddress())); + + InetSocketAddress boundIpcAddress = jn.getBoundIpcAddress(); + Set excluded = Sets.newHashSet(boundIpcAddress); + List addrList = Util.getLoggerAddresses(uri, excluded); + + // Exclude the current JournalNode instance (a local address and the same port). If the address + // is bound to a local address on the same port, then remove it to handle scenarios where a + // wildcard address (e.g. "0.0.0.0") is used. We can't simply exclude all local addresses + // since we may be running multiple servers on the same host. + addrList.removeIf(addr -> !addr.isUnresolved() && addr.getAddress().isAnyLocalAddress() + && boundIpcAddress.getPort() == addr.getPort()); + + return addrList; } private void getMissingLogSegments(List thisJournalEditLogs, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java index 8d9aee77ec..ee25a27b8f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.qjournal.server; +import java.net.InetSocketAddress; +import java.net.URISyntaxException; import java.util.function.Supplier; import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; @@ -35,6 +37,7 @@ import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager .getLogFile; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.test.GenericTestUtils; @@ -96,12 +99,45 @@ public class TestJournalNodeSync { } } + /** + * Test that the "self exclusion" works when there are multiple JournalNode instances running on + * the same server, but on different ports. + */ + @Test + public void testJournalNodeExcludesSelfMultilpePorts() throws URISyntaxException, IOException { + String uri = qjmhaCluster.getJournalCluster().getQuorumJournalURI("ns1").toString(); + JournalNodeSyncer syncer = jCluster.getJournalNode(0).getJournalSyncer("ns1"); + + // Test: Get the Journal address list for the default configuration + List addrList = syncer.getJournalAddrList(uri); + + // Verify: One of the addresses should be excluded so that the node isn't syncing with itself + assertEquals(2, addrList.size()); + } + + /** + * Test that the "self exclusion" works when there a host uses a wildcard address. + */ + @Test + public void testJournalNodeExcludesSelfWildCard() throws URISyntaxException, IOException { + String uri = qjmhaCluster.getJournalCluster().getQuorumJournalURI("ns1").toString(); + JournalNodeSyncer syncer = jCluster.getJournalNode(0).getJournalSyncer("ns1"); + + // Test: Request the same Journal address list, but using the IPv4 "0.0.0.0" which is commonly + // used as a bind host. + String boundHostUri = uri.replaceAll("127.0.0.1", "0.0.0.0"); + List boundHostAddrList = syncer.getJournalAddrList(boundHostUri); + + // Verify: One of the address should be excluded so that the node isn't syncing with itself + assertEquals(2, boundHostAddrList.size()); + } + @Test(timeout=30000) public void testJournalNodeSync() throws Exception { //As by default 3 journal nodes are started; for(int i=0; i<3; i++) { - Assert.assertEquals(true, + assertEquals(true, jCluster.getJournalNode(i).getJournalSyncerStatus("ns1")); } @@ -386,13 +422,13 @@ public class TestJournalNodeSync { HdfsConstants.RollingUpgradeAction.PREPARE); //query rolling upgrade - Assert.assertEquals(info, dfsActive.rollingUpgrade( + assertEquals(info, dfsActive.rollingUpgrade( HdfsConstants.RollingUpgradeAction.QUERY)); // Restart the Standby NN with rollingUpgrade option dfsCluster.restartNameNode(standbyNNindex, true, "-rollingUpgrade", "started"); - Assert.assertEquals(info, dfsActive.rollingUpgrade( + assertEquals(info, dfsActive.rollingUpgrade( HdfsConstants.RollingUpgradeAction.QUERY)); // Do some edits and delete some edit logs @@ -420,7 +456,7 @@ public class TestJournalNodeSync { // Restart the current standby NN (previously active) dfsCluster.restartNameNode(standbyNNindex, true, "-rollingUpgrade", "started"); - Assert.assertEquals(info, dfsActive.rollingUpgrade( + assertEquals(info, dfsActive.rollingUpgrade( HdfsConstants.RollingUpgradeAction.QUERY)); dfsCluster.waitActive();