diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java index df1314ac1e..3e8831d808 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java @@ -124,6 +124,11 @@ synchronized Journal getOrCreateJournal(String jid, return journal; } + @VisibleForTesting + public JournalNodeSyncer getJournalSyncer(String jid) { + return journalSyncersById.get(jid); + } + @VisibleForTesting public boolean getJournalSyncerStatus(String jid) { if (journalSyncersById.get(jid) != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java index fd29c849df..f451b46de7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.qjournal.server; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -39,6 +40,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Lists; +import org.apache.hadoop.util.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,10 +52,10 @@ import java.net.URISyntaxException; import java.net.URL; import java.security.PrivilegedExceptionAction; -import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.Set; /** * A Journal Sync thread runs through the lifetime of the JN. It periodically @@ -153,6 +155,9 @@ private boolean getOtherJournalNodeProxies() { LOG.warn("Could not add proxy for Journal at addresss " + addr, e); } } + // Check if there are any other JournalNodes before starting the sync. Although some proxies + // may be unresolved now, the act of attempting to sync will instigate resolution when the + // servers become available. if (otherJNProxies.isEmpty()) { LOG.error("Cannot sync as there is no other JN available for sync."); return false; @@ -310,12 +315,24 @@ private List getOtherJournalNodeAddrs() { return null; } - private List getJournalAddrList(String uriStr) throws + @VisibleForTesting + protected List getJournalAddrList(String uriStr) throws URISyntaxException, IOException { URI uri = new URI(uriStr); - return Util.getLoggerAddresses(uri, - new HashSet<>(Arrays.asList(jn.getBoundIpcAddress())), conf); + + InetSocketAddress boundIpcAddress = jn.getBoundIpcAddress(); + Set excluded = Sets.newHashSet(boundIpcAddress); + List addrList = Util.getLoggerAddresses(uri, excluded, conf); + + // Exclude the current JournalNode instance (a local address and the same port). If the address + // is bound to a local address on the same port, then remove it to handle scenarios where a + // wildcard address (e.g. "0.0.0.0") is used. We can't simply exclude all local addresses + // since we may be running multiple servers on the same host. + addrList.removeIf(addr -> !addr.isUnresolved() && addr.getAddress().isAnyLocalAddress() + && boundIpcAddress.getPort() == addr.getPort()); + + return addrList; } private void getMissingLogSegments(List thisJournalEditLogs, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java index 1564e41031..28e36e03bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.qjournal.server; +import java.net.InetSocketAddress; +import java.net.URISyntaxException; import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -34,6 +36,7 @@ import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager .getLogFile; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.test.GenericTestUtils; @@ -96,12 +99,45 @@ public void shutDownMiniCluster() throws IOException { } } + /** + * Test that the "self exclusion" works when there are multiple JournalNode instances running on + * the same server, but on different ports. + */ + @Test + public void testJournalNodeExcludesSelfMultilpePorts() throws URISyntaxException, IOException { + String uri = qjmhaCluster.getJournalCluster().getQuorumJournalURI("ns1").toString(); + JournalNodeSyncer syncer = jCluster.getJournalNode(0).getJournalSyncer("ns1"); + + // Test: Get the Journal address list for the default configuration + List addrList = syncer.getJournalAddrList(uri); + + // Verify: One of the addresses should be excluded so that the node isn't syncing with itself + assertEquals(2, addrList.size()); + } + + /** + * Test that the "self exclusion" works when there a host uses a wildcard address. + */ + @Test + public void testJournalNodeExcludesSelfWildCard() throws URISyntaxException, IOException { + String uri = qjmhaCluster.getJournalCluster().getQuorumJournalURI("ns1").toString(); + JournalNodeSyncer syncer = jCluster.getJournalNode(0).getJournalSyncer("ns1"); + + // Test: Request the same Journal address list, but using the IPv4 "0.0.0.0" which is commonly + // used as a bind host. + String boundHostUri = uri.replaceAll("127.0.0.1", "0.0.0.0"); + List boundHostAddrList = syncer.getJournalAddrList(boundHostUri); + + // Verify: One of the address should be excluded so that the node isn't syncing with itself + assertEquals(2, boundHostAddrList.size()); + } + @Test(timeout=30000) public void testJournalNodeSync() throws Exception { //As by default 3 journal nodes are started; for(int i=0; i<3; i++) { - Assert.assertEquals(true, + assertEquals(true, jCluster.getJournalNode(i).getJournalSyncerStatus("ns1")); } @@ -386,13 +422,13 @@ public void testSyncDuringRollingUpgrade() throws Exception { HdfsConstants.RollingUpgradeAction.PREPARE); //query rolling upgrade - Assert.assertEquals(info, dfsActive.rollingUpgrade( + assertEquals(info, dfsActive.rollingUpgrade( HdfsConstants.RollingUpgradeAction.QUERY)); // Restart the Standby NN with rollingUpgrade option dfsCluster.restartNameNode(standbyNNindex, true, "-rollingUpgrade", "started"); - Assert.assertEquals(info, dfsActive.rollingUpgrade( + assertEquals(info, dfsActive.rollingUpgrade( HdfsConstants.RollingUpgradeAction.QUERY)); // Do some edits and delete some edit logs @@ -420,7 +456,7 @@ public void testSyncDuringRollingUpgrade() throws Exception { // Restart the current standby NN (previously active) dfsCluster.restartNameNode(standbyNNindex, true, "-rollingUpgrade", "started"); - Assert.assertEquals(info, dfsActive.rollingUpgrade( + assertEquals(info, dfsActive.rollingUpgrade( HdfsConstants.RollingUpgradeAction.QUERY)); dfsCluster.waitActive();