HDFS-15281. Make sure ZKFC uses dfs.namenode.rpc-address to bind to host address (#1964)

Contributed by Dhiraj Hegde.

Signed-off-by: Mingliang Liu <liuml07@apache.org>
Signed-off-by: Inigo Goiri <inigoiri@apache.org>
This commit is contained in:
Dhiraj 2020-04-25 13:04:32 -07:00 committed by Mingliang Liu
parent c2769384ac
commit 1c19107ce8
No known key found for this signature in database
GPG Key ID: BC2FB8C6908A0C16
3 changed files with 49 additions and 6 deletions

View File

@ -318,9 +318,10 @@ private void initHM() {
healthMonitor.addServiceStateCallback(new ServiceStateCallBacks()); healthMonitor.addServiceStateCallback(new ServiceStateCallBacks());
healthMonitor.start(); healthMonitor.start();
} }
protected void initRPC() throws IOException { protected void initRPC() throws IOException {
InetSocketAddress bindAddr = getRpcAddressToBindTo(); InetSocketAddress bindAddr = getRpcAddressToBindTo();
LOG.info("ZKFC RpcServer binding to {}", bindAddr);
rpcServer = new ZKFCRpcServer(conf, bindAddr, this, getPolicyProvider()); rpcServer = new ZKFCRpcServer(conf, bindAddr, this, getPolicyProvider());
} }

View File

@ -111,21 +111,39 @@ protected byte[] targetToData(HAServiceTarget target) {
@Override @Override
protected InetSocketAddress getRpcAddressToBindTo() { protected InetSocketAddress getRpcAddressToBindTo() {
int zkfcPort = getZkfcPort(conf); int zkfcPort = getZkfcPort(conf);
return new InetSocketAddress(localTarget.getAddress().getAddress(), String zkfcBindAddr = getZkfcServerBindHost(conf);
zkfcPort); if (zkfcBindAddr == null || zkfcBindAddr.isEmpty()) {
zkfcBindAddr = localTarget.getAddress().getAddress().getHostAddress();
}
return new InetSocketAddress(zkfcBindAddr, zkfcPort);
} }
@Override @Override
protected PolicyProvider getPolicyProvider() { protected PolicyProvider getPolicyProvider() {
return new HDFSPolicyProvider(); return new HDFSPolicyProvider();
} }
static int getZkfcPort(Configuration conf) { static int getZkfcPort(Configuration conf) {
return conf.getInt(DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY, return conf.getInt(DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY,
DFSConfigKeys.DFS_HA_ZKFC_PORT_DEFAULT); DFSConfigKeys.DFS_HA_ZKFC_PORT_DEFAULT);
} }
/**
* Given a configuration get the bind host that could be used by ZKFC.
* We derive it from NN service rpc bind host or NN rpc bind host.
*
* @param conf input configuration
* @return the bind host address found in conf
*/
private static String getZkfcServerBindHost(Configuration conf) {
String addr = conf.getTrimmed(
DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY);
if (addr == null || addr.isEmpty()) {
addr = conf.getTrimmed(DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY);
}
return addr;
}
public static DFSZKFailoverController create(Configuration conf) { public static DFSZKFailoverController create(Configuration conf) {
Configuration localNNConf = DFSHAAdmin.addSecurityConfiguration(conf); Configuration localNNConf = DFSHAAdmin.addSecurityConfiguration(conf);
String nsId = DFSUtil.getNamenodeNameServiceId(conf); String nsId = DFSUtil.getNamenodeNameServiceId(conf);

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hdfs.tools; package org.apache.hadoop.hdfs.tools;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -57,6 +58,8 @@
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
public class TestDFSZKFailoverController extends ClientBaseWithFixes { public class TestDFSZKFailoverController extends ClientBaseWithFixes {
private static final String LOCALHOST_SERVER_ADDRESS = "127.0.0.1";
private static final String WILDCARD_ADDRESS = "0.0.0.0";
private Configuration conf; private Configuration conf;
private MiniDFSCluster cluster; private MiniDFSCluster cluster;
private TestContext ctx; private TestContext ctx;
@ -207,6 +210,27 @@ public void testManualFailover() throws Exception {
waitForHAState(1, HAServiceState.STANDBY); waitForHAState(1, HAServiceState.STANDBY);
} }
@Test(timeout=30000)
public void testWithoutBindAddressSet() throws Exception {
DFSZKFailoverController zkfc = DFSZKFailoverController.create(
conf);
assertEquals("Bind address not expected to be wildcard by default.",
zkfc.getRpcAddressToBindTo().getHostString(),
LOCALHOST_SERVER_ADDRESS);
}
@Test(timeout=30000)
public void testWithBindAddressSet() throws Exception {
conf.set(DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY, WILDCARD_ADDRESS);
DFSZKFailoverController zkfc = DFSZKFailoverController.create(
conf);
String addr = zkfc.getRpcAddressToBindTo().getHostString();
assertEquals("Bind address " + addr + " is not wildcard.",
addr, WILDCARD_ADDRESS);
}
/** /**
* Tests that a Namenode in Observer state rejects any state transition * Tests that a Namenode in Observer state rejects any state transition
* request from ZKFC, as a result of namenode's participation in the ZK * request from ZKFC, as a result of namenode's participation in the ZK