HDFS-10207. Support enable Hadoop IPC backoff without namenode restart. Contributed by Xiaobing Zhou.

This commit is contained in:
Xiaoyu Yao 2016-04-21 10:18:48 -07:00
parent 7da5847cf1
commit b4be288c5d
7 changed files with 173 additions and 84 deletions

View File

@ -51,7 +51,7 @@ static Class<? extends RpcScheduler> convertSchedulerClass(
return (Class<? extends RpcScheduler>)schedulerClass;
}
private final boolean clientBackOffEnabled;
private volatile boolean clientBackOffEnabled;
// Atomic refs point to active callQueue
// We have two so we can better control swapping
@ -185,6 +185,10 @@ int getPriorityLevel(Schedulable e) {
return scheduler.getPriorityLevel(e);
}
void setClientBackoffEnabled(boolean value) {
clientBackOffEnabled = value;
}
/**
* Insert e into the backing queue or block until we can.
* If we block and the queue changes on us, we will insert while the

View File

@ -2921,7 +2921,15 @@ public int getNumOpenConnections() {
public int getCallQueueLen() {
return callQueue.size();
}
public boolean isClientBackoffEnabled() {
return callQueue.isClientBackoffEnabled();
}
public void setClientBackoffEnabled(boolean value) {
callQueue.setClientBackoffEnabled(value);
}
/**
* The maximum size of the rpc call queue of this server.
* @return The maximum size of the rpc call queue.

View File

@ -25,7 +25,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;

View File

@ -21,6 +21,7 @@
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -97,8 +98,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -155,6 +156,9 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.FS_PROTECTED_DIRECTORIES;
import static org.apache.hadoop.util.ExitUtil.terminate;
import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_NAMESPACE;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT;
/**********************************************************
* NameNode serves as both directory namespace manager and
@ -274,13 +278,15 @@ public static enum OperationCategory {
DFS_HA_AUTO_FAILOVER_ENABLED_KEY
};
private String ipcClientRPCBackoffEnable;
/** A list of property that are reconfigurable at runtime. */
static final List<String> RECONFIGURABLE_PROPERTIES = Collections
.unmodifiableList(Arrays
.asList(DFS_HEARTBEAT_INTERVAL_KEY,
DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
FS_PROTECTED_DIRECTORIES,
HADOOP_CALLER_CONTEXT_ENABLED_KEY));
private final TreeSet<String> reconfigurableProperties = Sets
.newTreeSet(Lists.newArrayList(
DFS_HEARTBEAT_INTERVAL_KEY,
DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
FS_PROTECTED_DIRECTORIES,
HADOOP_CALLER_CONTEXT_ENABLED_KEY));
private static final String USAGE = "Usage: hdfs namenode ["
+ StartupOption.BACKUP.getName() + "] | \n\t["
@ -702,6 +708,9 @@ protected void initialize(Configuration conf) throws IOException {
loadNamesystem(conf);
rpcServer = createRpcServer(conf);
initReconfigurableBackoffKey();
if (clientNamenodeAddress == null) {
// This is expected for MiniDFSCluster. Set it now using
// the RPC server's bind address.
@ -719,6 +728,18 @@ protected void initialize(Configuration conf) throws IOException {
startMetricsLogger(conf);
}
private void initReconfigurableBackoffKey() {
ipcClientRPCBackoffEnable = buildBackoffEnableKey(rpcServer
.getClientRpcServer().getPort());
reconfigurableProperties.add(ipcClientRPCBackoffEnable);
}
static String buildBackoffEnableKey(final int port) {
// format used to construct backoff enable key, e.g. ipc.8020.backoff.enable
String format = "%s.%d.%s";
return String.format(format, IPC_NAMESPACE, port, IPC_BACKOFF_ENABLE);
}
/**
* Start a timer to periodically write NameNode metrics to the log
* file. This behavior can be disabled by configuration.
@ -1956,7 +1977,7 @@ void checkHaStateChange(StateChangeRequestInfo req)
* */
@Override // ReconfigurableBase
public Collection<String> getReconfigurableProperties() {
return RECONFIGURABLE_PROPERTIES;
return reconfigurableProperties;
}
/*
@ -1968,57 +1989,67 @@ protected String reconfigurePropertyImpl(String property, String newVal)
final DatanodeManager datanodeManager = namesystem.getBlockManager()
.getDatanodeManager();
switch (property) {
case DFS_HEARTBEAT_INTERVAL_KEY:
namesystem.writeLock();
try {
if (newVal == null) {
// set to default
datanodeManager.setHeartbeatInterval(DFS_HEARTBEAT_INTERVAL_DEFAULT);
return String.valueOf(DFS_HEARTBEAT_INTERVAL_DEFAULT);
} else {
datanodeManager.setHeartbeatInterval(Long.parseLong(newVal));
return String.valueOf(datanodeManager.getHeartbeatInterval());
}
} catch (NumberFormatException nfe) {
throw new ReconfigurationException(property, newVal, getConf().get(
property), nfe);
} finally {
namesystem.writeUnlock();
LOG.info("RECONFIGURE* changed heartbeatInterval to "
+ datanodeManager.getHeartbeatInterval());
}
case DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY:
namesystem.writeLock();
try {
if (newVal == null) {
// set to default
datanodeManager
.setHeartbeatRecheckInterval(
DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT);
return String
.valueOf(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT);
} else {
datanodeManager.setHeartbeatRecheckInterval(Integer.parseInt(newVal));
return String.valueOf(datanodeManager.getHeartbeatRecheckInterval());
}
} catch (NumberFormatException nfe) {
throw new ReconfigurationException(property, newVal, getConf().get(
property), nfe);
} finally {
namesystem.writeUnlock();
LOG.info("RECONFIGURE* changed heartbeatRecheckInterval to "
+ datanodeManager.getHeartbeatRecheckInterval());
}
case FS_PROTECTED_DIRECTORIES:
if (property.equals(DFS_HEARTBEAT_INTERVAL_KEY)) {
return reconfHeartbeatInterval(datanodeManager, property, newVal);
} else if (property.equals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY)) {
return reconfHeartbeatRecheckInterval(datanodeManager, property, newVal);
} else if (property.equals(FS_PROTECTED_DIRECTORIES)) {
return reconfProtectedDirectories(newVal);
case HADOOP_CALLER_CONTEXT_ENABLED_KEY:
} else if (property.equals(HADOOP_CALLER_CONTEXT_ENABLED_KEY)) {
return reconfCallerContextEnabled(newVal);
default:
break;
} else if (property.equals(ipcClientRPCBackoffEnable)) {
return reconfigureIPCBackoffEnabled(newVal);
} else {
throw new ReconfigurationException(property, newVal, getConf().get(
property));
}
}
private String reconfHeartbeatInterval(final DatanodeManager datanodeManager,
final String property, final String newVal)
throws ReconfigurationException {
namesystem.writeLock();
try {
if (newVal == null) {
// set to default
datanodeManager.setHeartbeatInterval(DFS_HEARTBEAT_INTERVAL_DEFAULT);
return String.valueOf(DFS_HEARTBEAT_INTERVAL_DEFAULT);
} else {
datanodeManager.setHeartbeatInterval(Long.parseLong(newVal));
return String.valueOf(datanodeManager.getHeartbeatInterval());
}
} catch (NumberFormatException nfe) {
throw new ReconfigurationException(property, newVal, getConf().get(
property), nfe);
} finally {
namesystem.writeUnlock();
LOG.info("RECONFIGURE* changed heartbeatInterval to "
+ datanodeManager.getHeartbeatInterval());
}
}
private String reconfHeartbeatRecheckInterval(
final DatanodeManager datanodeManager, final String property,
final String newVal) throws ReconfigurationException {
namesystem.writeLock();
try {
if (newVal == null) {
// set to default
datanodeManager.setHeartbeatRecheckInterval(
DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT);
return String.valueOf(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT);
} else {
datanodeManager.setHeartbeatRecheckInterval(Integer.parseInt(newVal));
return String.valueOf(datanodeManager.getHeartbeatRecheckInterval());
}
} catch (NumberFormatException nfe) {
throw new ReconfigurationException(property, newVal, getConf().get(
property), nfe);
} finally {
namesystem.writeUnlock();
LOG.info("RECONFIGURE* changed heartbeatRecheckInterval to "
+ datanodeManager.getHeartbeatRecheckInterval());
}
throw new ReconfigurationException(property, newVal, getConf()
.get(property));
}
private String reconfProtectedDirectories(String newVal) {
@ -2036,6 +2067,18 @@ private String reconfCallerContextEnabled(String newVal) {
return Boolean.toString(callerContextEnabled);
}
String reconfigureIPCBackoffEnabled(String newVal) {
boolean clientBackoffEnabled;
if (newVal == null) {
clientBackoffEnabled = IPC_BACKOFF_ENABLE_DEFAULT;
} else {
clientBackoffEnabled = Boolean.parseBoolean(newVal);
}
rpcServer.getClientRpcServer()
.setClientBackoffEnabled(clientBackoffEnabled);
return Boolean.toString(clientBackoffEnabled);
}
@Override // ReconfigurableBase
protected Configuration getNewConf() {
return new HdfsConfiguration();

View File

@ -2249,6 +2249,6 @@ public ReconfigurationTaskStatus getReconfigurationStatus()
public List<String> listReconfigurableProperties() throws IOException {
checkNNStartup();
namesystem.checkSuperuserPrivilege();
return NameNode.RECONFIGURABLE_PROPERTIES;
return Lists.newArrayList(nn.getReconfigurableProperties());
}
}

View File

@ -40,6 +40,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT;
public class TestNameNodeReconfigure {
@ -63,33 +64,15 @@ public void testReconfigureCallerContextEnabled()
// try invalid values
nameNode.reconfigureProperty(HADOOP_CALLER_CONTEXT_ENABLED_KEY, "text");
assertEquals(HADOOP_CALLER_CONTEXT_ENABLED_KEY + " has wrong value", false,
nameSystem.getCallerContextEnabled());
assertEquals(
HADOOP_CALLER_CONTEXT_ENABLED_KEY + " has wrong value",
false,
nameNode.getConf().getBoolean(HADOOP_CALLER_CONTEXT_ENABLED_KEY,
HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT));
verifyReconfigureCallerContextEnabled(nameNode, nameSystem, false);
// enable CallerContext
nameNode.reconfigureProperty(HADOOP_CALLER_CONTEXT_ENABLED_KEY, "true");
assertEquals(HADOOP_CALLER_CONTEXT_ENABLED_KEY + " has wrong value", true,
nameSystem.getCallerContextEnabled());
assertEquals(
HADOOP_CALLER_CONTEXT_ENABLED_KEY + " has wrong value",
true,
nameNode.getConf().getBoolean(HADOOP_CALLER_CONTEXT_ENABLED_KEY,
HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT));
verifyReconfigureCallerContextEnabled(nameNode, nameSystem, true);
// disable CallerContext
nameNode.reconfigureProperty(HADOOP_CALLER_CONTEXT_ENABLED_KEY, "false");
assertEquals(HADOOP_CALLER_CONTEXT_ENABLED_KEY + " has wrong value", false,
nameSystem.getCallerContextEnabled());
assertEquals(
HADOOP_CALLER_CONTEXT_ENABLED_KEY + " has wrong value",
false,
nameNode.getConf().getBoolean(HADOOP_CALLER_CONTEXT_ENABLED_KEY,
HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT));
verifyReconfigureCallerContextEnabled(nameNode, nameSystem, false);
// revert to default
nameNode.reconfigureProperty(HADOOP_CALLER_CONTEXT_ENABLED_KEY, null);
@ -101,11 +84,63 @@ public void testReconfigureCallerContextEnabled()
nameNode.getConf().get(HADOOP_CALLER_CONTEXT_ENABLED_KEY));
}
void verifyReconfigureCallerContextEnabled(final NameNode nameNode,
final FSNamesystem nameSystem, boolean expected) {
assertEquals(HADOOP_CALLER_CONTEXT_ENABLED_KEY + " has wrong value",
expected, nameNode.getNamesystem().getCallerContextEnabled());
assertEquals(
HADOOP_CALLER_CONTEXT_ENABLED_KEY + " has wrong value",
expected,
nameNode.getConf().getBoolean(HADOOP_CALLER_CONTEXT_ENABLED_KEY,
HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT));
}
/**
* Test that we can modify configuration properties.
* Test to reconfigure enable/disable IPC backoff
*/
@Test
public void testReconfigureHearbeatCheck1() throws ReconfigurationException {
public void testReconfigureIPCBackoff() throws ReconfigurationException {
final NameNode nameNode = cluster.getNameNode();
NameNodeRpcServer nnrs = (NameNodeRpcServer) nameNode.getRpcServer();
String ipcClientRPCBackoffEnable = NameNode.buildBackoffEnableKey(nnrs
.getClientRpcServer().getPort());
// try invalid values
verifyReconfigureIPCBackoff(nameNode, nnrs, ipcClientRPCBackoffEnable,
false);
// enable IPC_CLIENT_RPC_BACKOFF
nameNode.reconfigureProperty(ipcClientRPCBackoffEnable, "true");
verifyReconfigureIPCBackoff(nameNode, nnrs, ipcClientRPCBackoffEnable,
true);
// disable IPC_CLIENT_RPC_BACKOFF
nameNode.reconfigureProperty(ipcClientRPCBackoffEnable, "false");
verifyReconfigureIPCBackoff(nameNode, nnrs, ipcClientRPCBackoffEnable,
false);
// revert to default
nameNode.reconfigureProperty(ipcClientRPCBackoffEnable, null);
assertEquals(ipcClientRPCBackoffEnable + " has wrong value", false,
nnrs.getClientRpcServer().isClientBackoffEnabled());
assertEquals(ipcClientRPCBackoffEnable + " has wrong value", null,
nameNode.getConf().get(ipcClientRPCBackoffEnable));
}
void verifyReconfigureIPCBackoff(final NameNode nameNode,
final NameNodeRpcServer nnrs, String property, boolean expected) {
assertEquals(property + " has wrong value", expected, nnrs
.getClientRpcServer().isClientBackoffEnabled());
assertEquals(property + " has wrong value", expected, nameNode.getConf()
.getBoolean(property, IPC_BACKOFF_ENABLE_DEFAULT));
}
/**
* Test to reconfigure interval of heart beat check and re-check.
*/
@Test
public void testReconfigureHearbeatCheck() throws ReconfigurationException {
final NameNode nameNode = cluster.getNameNode();
final DatanodeManager datanodeManager = nameNode.namesystem
.getBlockManager().getDatanodeManager();

View File

@ -234,7 +234,7 @@ public void testNameNodeGetReconfigurableProperties() throws IOException {
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("namenode", address, outs, errs);
assertEquals(5, outs.size());
assertEquals(6, outs.size());
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(1));
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(2));
assertEquals(errs.size(), 0);