HDFS-16684. Exclude the current JournalNode (#4786)

The JournalNodeSyncer will include the local instance in syncing when using a bind host (e.g. 0.0.0.0).  There is a mechanism that is supposed to exclude the local instance, but it doesn't recognize the meta-address as a local address.

Running with bind addresses set to 0.0.0.0, the JournalNodeSyncer will log attempts to sync with itself as part of the normal syncing rotation.  For an HA configuration running 3 JournalNodes, the "other" list used by the JournalNodeSyncer will include 3 proxies.

Exclude bound local addresses, including the use of a wildcard address in the bound host configurations, while still allowing multiple instances on the same host.

Allow sync attempts with unresolved addresses, so that sync attempts can drive resolution as servers become available.

Backport.
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Steve Vaughan 2022-08-28 14:15:04 -04:00 committed by GitHub
parent 3edddaf9fc
commit 833fc64558
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 65 additions and 7 deletions

View File

@ -122,6 +122,11 @@ synchronized Journal getOrCreateJournal(String jid,
return journal; return journal;
} }
@VisibleForTesting
public JournalNodeSyncer getJournalSyncer(String jid) {
return journalSyncersById.get(jid);
}
@VisibleForTesting @VisibleForTesting
public boolean getJournalSyncerStatus(String jid) { public boolean getJournalSyncerStatus(String jid) {
if (journalSyncersById.get(jid) != null) { if (journalSyncersById.get(jid) != null) {

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hdfs.qjournal.server; package org.apache.hadoop.hdfs.qjournal.server;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.thirdparty.com.google.common.collect.Sets; import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
@ -54,6 +55,7 @@
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
/** /**
* A Journal Sync thread runs through the lifetime of the JN. It periodically * A Journal Sync thread runs through the lifetime of the JN. It periodically
@ -153,6 +155,9 @@ private boolean getOtherJournalNodeProxies() {
LOG.warn("Could not add proxy for Journal at addresss " + addr, e); LOG.warn("Could not add proxy for Journal at addresss " + addr, e);
} }
} }
// Check if there are any other JournalNodes before starting the sync. Although some proxies
// may be unresolved now, the act of attempting to sync will instigate resolution when the
// servers become available.
if (otherJNProxies.isEmpty()) { if (otherJNProxies.isEmpty()) {
LOG.error("Cannot sync as there is no other JN available for sync."); LOG.error("Cannot sync as there is no other JN available for sync.");
return false; return false;
@ -310,12 +315,24 @@ private List<InetSocketAddress> getOtherJournalNodeAddrs() {
return null; return null;
} }
private List<InetSocketAddress> getJournalAddrList(String uriStr) throws @VisibleForTesting
protected List<InetSocketAddress> getJournalAddrList(String uriStr) throws
URISyntaxException, URISyntaxException,
IOException { IOException {
URI uri = new URI(uriStr); URI uri = new URI(uriStr);
return Util.getLoggerAddresses(uri,
Sets.newHashSet(jn.getBoundIpcAddress())); InetSocketAddress boundIpcAddress = jn.getBoundIpcAddress();
Set<InetSocketAddress> excluded = Sets.newHashSet(boundIpcAddress);
List<InetSocketAddress> addrList = Util.getLoggerAddresses(uri, excluded);
// Exclude the current JournalNode instance (a local address and the same port). If the address
// is bound to a local address on the same port, then remove it to handle scenarios where a
// wildcard address (e.g. "0.0.0.0") is used. We can't simply exclude all local addresses
// since we may be running multiple servers on the same host.
addrList.removeIf(addr -> !addr.isUnresolved() && addr.getAddress().isAnyLocalAddress()
&& boundIpcAddress.getPort() == addr.getPort());
return addrList;
} }
private void getMissingLogSegments(List<RemoteEditLog> thisJournalEditLogs, private void getMissingLogSegments(List<RemoteEditLog> thisJournalEditLogs,

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hdfs.qjournal.server; package org.apache.hadoop.hdfs.qjournal.server;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -35,6 +37,7 @@
import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager
.getLogFile; .getLogFile;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
@ -96,12 +99,45 @@ public void shutDownMiniCluster() throws IOException {
} }
} }
/**
* Test that the "self exclusion" works when there are multiple JournalNode instances running on
* the same server, but on different ports.
*/
@Test
public void testJournalNodeExcludesSelfMultilpePorts() throws URISyntaxException, IOException {
String uri = qjmhaCluster.getJournalCluster().getQuorumJournalURI("ns1").toString();
JournalNodeSyncer syncer = jCluster.getJournalNode(0).getJournalSyncer("ns1");
// Test: Get the Journal address list for the default configuration
List<InetSocketAddress> addrList = syncer.getJournalAddrList(uri);
// Verify: One of the addresses should be excluded so that the node isn't syncing with itself
assertEquals(2, addrList.size());
}
/**
* Test that the "self exclusion" works when there a host uses a wildcard address.
*/
@Test
public void testJournalNodeExcludesSelfWildCard() throws URISyntaxException, IOException {
String uri = qjmhaCluster.getJournalCluster().getQuorumJournalURI("ns1").toString();
JournalNodeSyncer syncer = jCluster.getJournalNode(0).getJournalSyncer("ns1");
// Test: Request the same Journal address list, but using the IPv4 "0.0.0.0" which is commonly
// used as a bind host.
String boundHostUri = uri.replaceAll("127.0.0.1", "0.0.0.0");
List<InetSocketAddress> boundHostAddrList = syncer.getJournalAddrList(boundHostUri);
// Verify: One of the address should be excluded so that the node isn't syncing with itself
assertEquals(2, boundHostAddrList.size());
}
@Test(timeout=30000) @Test(timeout=30000)
public void testJournalNodeSync() throws Exception { public void testJournalNodeSync() throws Exception {
//As by default 3 journal nodes are started; //As by default 3 journal nodes are started;
for(int i=0; i<3; i++) { for(int i=0; i<3; i++) {
Assert.assertEquals(true, assertEquals(true,
jCluster.getJournalNode(i).getJournalSyncerStatus("ns1")); jCluster.getJournalNode(i).getJournalSyncerStatus("ns1"));
} }
@ -386,13 +422,13 @@ public void testSyncDuringRollingUpgrade() throws Exception {
HdfsConstants.RollingUpgradeAction.PREPARE); HdfsConstants.RollingUpgradeAction.PREPARE);
//query rolling upgrade //query rolling upgrade
Assert.assertEquals(info, dfsActive.rollingUpgrade( assertEquals(info, dfsActive.rollingUpgrade(
HdfsConstants.RollingUpgradeAction.QUERY)); HdfsConstants.RollingUpgradeAction.QUERY));
// Restart the Standby NN with rollingUpgrade option // Restart the Standby NN with rollingUpgrade option
dfsCluster.restartNameNode(standbyNNindex, true, dfsCluster.restartNameNode(standbyNNindex, true,
"-rollingUpgrade", "started"); "-rollingUpgrade", "started");
Assert.assertEquals(info, dfsActive.rollingUpgrade( assertEquals(info, dfsActive.rollingUpgrade(
HdfsConstants.RollingUpgradeAction.QUERY)); HdfsConstants.RollingUpgradeAction.QUERY));
// Do some edits and delete some edit logs // Do some edits and delete some edit logs
@ -420,7 +456,7 @@ public void testSyncDuringRollingUpgrade() throws Exception {
// Restart the current standby NN (previously active) // Restart the current standby NN (previously active)
dfsCluster.restartNameNode(standbyNNindex, true, dfsCluster.restartNameNode(standbyNNindex, true,
"-rollingUpgrade", "started"); "-rollingUpgrade", "started");
Assert.assertEquals(info, dfsActive.rollingUpgrade( assertEquals(info, dfsActive.rollingUpgrade(
HdfsConstants.RollingUpgradeAction.QUERY)); HdfsConstants.RollingUpgradeAction.QUERY));
dfsCluster.waitActive(); dfsCluster.waitActive();