HDFS-16400. Reconfig DataXceiver parameters for datanode (#3843)

Reviewed-by: Viraj Jasani <vjasani@apache.org>
Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
(cherry picked from commit f02374df92)
This commit is contained in:
litao 2022-01-14 13:48:10 +08:00 committed by Takanobu Asanuma
parent cdaf4d89f9
commit 11fe5279b0
4 changed files with 77 additions and 4 deletions

View File

@ -36,6 +36,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT;
@ -307,7 +309,8 @@ public class DataNode extends ReconfigurableBase
Arrays.asList( Arrays.asList(
DFS_DATANODE_DATA_DIR_KEY, DFS_DATANODE_DATA_DIR_KEY,
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY)); DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
DFS_DATANODE_MAX_RECEIVER_THREADS_KEY));
public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog"); public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
@ -646,6 +649,8 @@ public String reconfigurePropertyImpl(String property, String newVal)
} }
break; break;
} }
case DFS_DATANODE_MAX_RECEIVER_THREADS_KEY:
return reconfDataXceiverParameters(property, newVal);
default: default:
break; break;
} }
@ -653,6 +658,23 @@ public String reconfigurePropertyImpl(String property, String newVal)
property, newVal, getConf().get(property)); property, newVal, getConf().get(property));
} }
private String reconfDataXceiverParameters(String property, String newVal)
throws ReconfigurationException {
String result;
try {
LOG.info("Reconfiguring {} to {}", property, newVal);
Preconditions.checkNotNull(getXferServer(), "DataXceiverServer has not been initialized.");
int threads = (newVal == null ? DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT :
Integer.parseInt(newVal));
result = Integer.toString(threads);
getXferServer().setMaxXceiverCount(threads);
LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
return result;
} catch (IllegalArgumentException e) {
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
}
}
/** /**
* Get a list of the keys of the re-configurable properties in configuration. * Get a list of the keys of the re-configurable properties in configuration.
*/ */

View File

@ -68,8 +68,7 @@ class DataXceiverServer implements Runnable {
* Enforcing the limit is required in order to avoid data-node * Enforcing the limit is required in order to avoid data-node
* running out of memory. * running out of memory.
*/ */
int maxXceiverCount = volatile int maxXceiverCount;
DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
/** /**
* A manager to make sure that cluster balancing does not take too much * A manager to make sure that cluster balancing does not take too much
@ -514,4 +513,15 @@ public boolean updateBalancerMaxConcurrentMovers(final int movers) {
void setMaxReconfigureWaitTime(int max) { void setMaxReconfigureWaitTime(int max) {
this.maxReconfigureWaitTime = max; this.maxReconfigureWaitTime = max;
} }
public void setMaxXceiverCount(int xceiverCount) {
Preconditions.checkArgument(xceiverCount > 0,
"dfs.datanode.max.transfer.threads should be larger than 0");
maxXceiverCount = xceiverCount;
}
@VisibleForTesting
public int getMaxXceiverCount() {
return maxXceiverCount;
}
} }

View File

@ -22,7 +22,10 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -365,4 +368,42 @@ public void testBlockReportIntervalReconfiguration()
.getConf().get(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY)); .getConf().get(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY));
} }
} }
@Test
public void testDataXceiverReconfiguration()
throws ReconfigurationException {
for (int i = 0; i < NUM_DATA_NODE; i++) {
DataNode dn = cluster.getDataNodes().get(i);
// Try invalid values.
try {
dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, "text");
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting NumberFormatException",
expected.getCause() instanceof NumberFormatException);
}
try {
dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, String.valueOf(-1));
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting IllegalArgumentException",
expected.getCause() instanceof IllegalArgumentException);
}
// Change properties and verify change.
dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, String.valueOf(123));
assertEquals(String.format("%s has wrong value", DFS_DATANODE_MAX_RECEIVER_THREADS_KEY),
123, dn.getXferServer().getMaxXceiverCount());
// Revert to default.
dn.reconfigureProperty(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, null);
assertEquals(String.format("%s has wrong value", DFS_DATANODE_MAX_RECEIVER_THREADS_KEY),
DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT, dn.getXferServer().getMaxXceiverCount());
assertNull(String.format("expect %s is not configured",
DFS_DATANODE_MAX_RECEIVER_THREADS_KEY),
dn.getConf().get(DFS_DATANODE_MAX_RECEIVER_THREADS_KEY));
}
}
} }

View File

@ -330,7 +330,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException {
final List<String> outs = Lists.newArrayList(); final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList(); final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("datanode", address, outs, errs); getReconfigurableProperties("datanode", address, outs, errs);
assertEquals(4, outs.size()); assertEquals(5, outs.size());
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1)); assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
} }