HDFS-16398. Reconfig block report parameters for datanode (#3831)
(cherry picked from commit c2ff39006f
)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
This commit is contained in:
parent
ff3a88b9c2
commit
ef1a2b478b
@ -107,12 +107,12 @@ public class DNConf {
|
||||
final long heartBeatInterval;
|
||||
private final long lifelineIntervalMs;
|
||||
volatile long blockReportInterval;
|
||||
final long blockReportSplitThreshold;
|
||||
volatile long blockReportSplitThreshold;
|
||||
final boolean peerStatsEnabled;
|
||||
final boolean diskStatsEnabled;
|
||||
final long outliersReportIntervalMs;
|
||||
final long ibrInterval;
|
||||
final long initialBlockReportDelayMs;
|
||||
volatile long initialBlockReportDelayMs;
|
||||
volatile long cacheReportInterval;
|
||||
final long datanodeSlowIoWarningThresholdMs;
|
||||
|
||||
@ -215,19 +215,7 @@ public DNConf(final Configurable dn) {
|
||||
this.datanodeSlowIoWarningThresholdMs = getConf().getLong(
|
||||
DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
|
||||
|
||||
long initBRDelay = getConf().getTimeDuration(
|
||||
DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
|
||||
DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT,
|
||||
TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
|
||||
if (initBRDelay >= blockReportInterval) {
|
||||
initBRDelay = 0;
|
||||
DataNode.LOG.info(DFS_BLOCKREPORT_INITIAL_DELAY_KEY + " is "
|
||||
+ "greater than or equal to" + DFS_BLOCKREPORT_INTERVAL_MSEC_KEY
|
||||
+ ". Setting initial delay to 0 msec:");
|
||||
}
|
||||
initialBlockReportDelayMs = initBRDelay;
|
||||
|
||||
initBlockReportDelay();
|
||||
heartBeatInterval = getConf().getTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY,
|
||||
DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS,
|
||||
TimeUnit.MILLISECONDS);
|
||||
@ -311,6 +299,19 @@ public DNConf(final Configurable dn) {
|
||||
);
|
||||
}
|
||||
|
||||
private void initBlockReportDelay() {
|
||||
long initBRDelay = getConf().getTimeDuration(
|
||||
DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
|
||||
DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT, TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
|
||||
if (initBRDelay >= blockReportInterval || initBRDelay < 0) {
|
||||
initBRDelay = 0;
|
||||
DataNode.LOG.info(DFS_BLOCKREPORT_INITIAL_DELAY_KEY +
|
||||
" is greater than or equal to " + DFS_BLOCKREPORT_INTERVAL_MSEC_KEY +
|
||||
". Setting initial delay to 0 msec.");
|
||||
}
|
||||
initialBlockReportDelayMs = initBRDelay;
|
||||
}
|
||||
|
||||
// We get minimumNameNodeVersion via a method so it can be mocked out in tests.
|
||||
String getMinimumNameNodeVersion() {
|
||||
return this.minimumNameNodeVersion;
|
||||
@ -477,7 +478,8 @@ public long getProcessCommandsThresholdMs() {
|
||||
}
|
||||
|
||||
void setBlockReportInterval(long intervalMs) {
|
||||
Preconditions.checkArgument(intervalMs > 0);
|
||||
Preconditions.checkArgument(intervalMs > 0,
|
||||
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY + " should be larger than 0");
|
||||
blockReportInterval = intervalMs;
|
||||
}
|
||||
|
||||
@ -487,11 +489,22 @@ public long getBlockReportInterval() {
|
||||
|
||||
void setCacheReportInterval(long intervalMs) {
|
||||
Preconditions.checkArgument(intervalMs > 0,
|
||||
"dfs.cachereport.intervalMsec should be larger than 0");
|
||||
DFS_CACHEREPORT_INTERVAL_MSEC_KEY + " should be larger than 0");
|
||||
cacheReportInterval = intervalMs;
|
||||
}
|
||||
|
||||
public long getCacheReportInterval() {
|
||||
return cacheReportInterval;
|
||||
}
|
||||
|
||||
void setBlockReportSplitThreshold(long threshold) {
|
||||
Preconditions.checkArgument(threshold >= 0,
|
||||
DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY + " should be larger than or equal to 0");
|
||||
blockReportSplitThreshold = threshold;
|
||||
}
|
||||
|
||||
void setInitBRDelayMs(String delayMs) {
|
||||
dn.getConf().set(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, delayMs);
|
||||
initBlockReportDelay();
|
||||
}
|
||||
}
|
||||
|
@ -18,10 +18,14 @@
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||
@ -312,6 +316,8 @@ public class DataNode extends ReconfigurableBase
|
||||
DFS_DATANODE_DATA_DIR_KEY,
|
||||
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
||||
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
|
||||
DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY,
|
||||
DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
|
||||
DFS_DATANODE_MAX_RECEIVER_THREADS_KEY,
|
||||
DFS_CACHEREPORT_INTERVAL_MSEC_KEY));
|
||||
|
||||
@ -619,39 +625,10 @@ public String reconfigurePropertyImpl(String property, String newVal)
|
||||
}
|
||||
break;
|
||||
}
|
||||
case DFS_BLOCKREPORT_INTERVAL_MSEC_KEY: {
|
||||
ReconfigurationException rootException = null;
|
||||
try {
|
||||
LOG.info("Reconfiguring {} to {}", property, newVal);
|
||||
long intervalMs;
|
||||
if (newVal == null) {
|
||||
// Set to default.
|
||||
intervalMs = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
|
||||
} else {
|
||||
intervalMs = Long.parseLong(newVal);
|
||||
}
|
||||
dnConf.setBlockReportInterval(intervalMs);
|
||||
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
|
||||
if (bpos != null) {
|
||||
for (BPServiceActor actor : bpos.getBPServiceActors()) {
|
||||
actor.getScheduler().setBlockReportIntervalMs(intervalMs);
|
||||
}
|
||||
}
|
||||
}
|
||||
return Long.toString(intervalMs);
|
||||
} catch (IllegalArgumentException e) {
|
||||
rootException = new ReconfigurationException(
|
||||
property, newVal, getConf().get(property), e);
|
||||
} finally {
|
||||
if (rootException != null) {
|
||||
LOG.warn(String.format(
|
||||
"Exception in updating block report interval %s to %s",
|
||||
property, newVal), rootException);
|
||||
throw rootException;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case DFS_BLOCKREPORT_INTERVAL_MSEC_KEY:
|
||||
case DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY:
|
||||
case DFS_BLOCKREPORT_INITIAL_DELAY_KEY:
|
||||
return reconfBlockReportParameters(property, newVal);
|
||||
case DFS_DATANODE_MAX_RECEIVER_THREADS_KEY:
|
||||
return reconfDataXceiverParameters(property, newVal);
|
||||
case DFS_CACHEREPORT_INTERVAL_MSEC_KEY:
|
||||
@ -697,6 +674,44 @@ private String reconfCacheReportParameters(String property, String newVal)
|
||||
}
|
||||
}
|
||||
|
||||
private String reconfBlockReportParameters(String property, String newVal)
|
||||
throws ReconfigurationException {
|
||||
String result = null;
|
||||
try {
|
||||
LOG.info("Reconfiguring {} to {}", property, newVal);
|
||||
if (property.equals(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY)) {
|
||||
Preconditions.checkNotNull(dnConf, "DNConf has not been initialized.");
|
||||
long intervalMs = newVal == null ? DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT :
|
||||
Long.parseLong(newVal);
|
||||
result = Long.toString(intervalMs);
|
||||
dnConf.setBlockReportInterval(intervalMs);
|
||||
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
|
||||
if (bpos != null) {
|
||||
for (BPServiceActor actor : bpos.getBPServiceActors()) {
|
||||
actor.getScheduler().setBlockReportIntervalMs(intervalMs);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (property.equals(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY)) {
|
||||
Preconditions.checkNotNull(dnConf, "DNConf has not been initialized.");
|
||||
long threshold = newVal == null ? DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT :
|
||||
Long.parseLong(newVal);
|
||||
result = Long.toString(threshold);
|
||||
dnConf.setBlockReportSplitThreshold(threshold);
|
||||
} else if (property.equals(DFS_BLOCKREPORT_INITIAL_DELAY_KEY)) {
|
||||
Preconditions.checkNotNull(dnConf, "DNConf has not been initialized.");
|
||||
int initialDelay = newVal == null ? DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT :
|
||||
Integer.parseInt(newVal);
|
||||
result = Integer.toString(initialDelay);
|
||||
dnConf.setInitBRDelayMs(result);
|
||||
}
|
||||
LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
|
||||
return result;
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a list of the keys of the re-configurable properties in configuration.
|
||||
*/
|
||||
@ -3823,4 +3838,9 @@ private static boolean isWrite(BlockConstructionStage stage) {
|
||||
return (stage == PIPELINE_SETUP_STREAMING_RECOVERY
|
||||
|| stage == PIPELINE_SETUP_APPEND_RECOVERY);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public BlockPoolManager getBlockPoolManager() {
|
||||
return blockPoolManager;
|
||||
}
|
||||
}
|
||||
|
@ -18,8 +18,10 @@
|
||||
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
|
||||
@ -303,40 +305,49 @@ private void testAcquireOnMaxConcurrentMoversReconfiguration(
|
||||
|
||||
@Test
|
||||
public void testBlockReportIntervalReconfiguration()
|
||||
throws ReconfigurationException, IOException {
|
||||
throws ReconfigurationException {
|
||||
int blockReportInterval = 300 * 1000;
|
||||
String[] blockReportParameters = {
|
||||
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
|
||||
DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY,
|
||||
DFS_BLOCKREPORT_INITIAL_DELAY_KEY};
|
||||
|
||||
for (int i = 0; i < NUM_DATA_NODE; i++) {
|
||||
DataNode dn = cluster.getDataNodes().get(i);
|
||||
BlockPoolManager blockPoolManager = dn.getBlockPoolManager();
|
||||
|
||||
// Try invalid values.
|
||||
try {
|
||||
dn.reconfigureProperty(
|
||||
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, "text");
|
||||
fail("ReconfigurationException expected");
|
||||
} catch (ReconfigurationException expected) {
|
||||
assertTrue("expecting NumberFormatException",
|
||||
expected.getCause() instanceof NumberFormatException);
|
||||
for (String blockReportParameter : blockReportParameters) {
|
||||
try {
|
||||
dn.reconfigureProperty(blockReportParameter, "text");
|
||||
fail("ReconfigurationException expected");
|
||||
} catch (ReconfigurationException expected) {
|
||||
assertTrue("expecting NumberFormatException",
|
||||
expected.getCause() instanceof NumberFormatException);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
dn.reconfigureProperty(
|
||||
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
|
||||
String.valueOf(-1));
|
||||
dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, String.valueOf(-1));
|
||||
fail("ReconfigurationException expected");
|
||||
} catch (ReconfigurationException expected) {
|
||||
assertTrue("expecting IllegalArgumentException",
|
||||
expected.getCause() instanceof IllegalArgumentException);
|
||||
}
|
||||
try {
|
||||
dn.reconfigureProperty(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, String.valueOf(-1));
|
||||
fail("ReconfigurationException expected");
|
||||
} catch (ReconfigurationException expected) {
|
||||
assertTrue("expecting IllegalArgumentException",
|
||||
expected.getCause() instanceof IllegalArgumentException);
|
||||
}
|
||||
dn.reconfigureProperty(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, String.valueOf(-1));
|
||||
assertEquals(0, dn.getDnConf().initialBlockReportDelayMs);
|
||||
|
||||
// Change properties.
|
||||
// Change properties and verify the change.
|
||||
dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
|
||||
String.valueOf(blockReportInterval));
|
||||
|
||||
// Verify change.
|
||||
assertEquals(String.format("%s has wrong value",
|
||||
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
|
||||
blockReportInterval,
|
||||
dn.getDnConf().getBlockReportInterval());
|
||||
for (BPOfferService bpos : dn.getAllBpOs()) {
|
||||
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
|
||||
if (bpos != null) {
|
||||
for (BPServiceActor actor : bpos.getBPServiceActors()) {
|
||||
assertEquals(String.format("%s has wrong value",
|
||||
@ -347,15 +358,15 @@ public void testBlockReportIntervalReconfiguration()
|
||||
}
|
||||
}
|
||||
|
||||
// Revert to default.
|
||||
dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
|
||||
null);
|
||||
assertEquals(String.format("%s has wrong value",
|
||||
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
|
||||
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT,
|
||||
dn.getDnConf().getBlockReportInterval());
|
||||
// Verify default.
|
||||
for (BPOfferService bpos : dn.getAllBpOs()) {
|
||||
dn.reconfigureProperty(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, String.valueOf(123));
|
||||
assertEquals(123, dn.getDnConf().blockReportSplitThreshold);
|
||||
|
||||
dn.reconfigureProperty(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, "123");
|
||||
assertEquals(123000, dn.getDnConf().initialBlockReportDelayMs);
|
||||
|
||||
// Revert to default and verify default.
|
||||
dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, null);
|
||||
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
|
||||
if (bpos != null) {
|
||||
for (BPServiceActor actor : bpos.getBPServiceActors()) {
|
||||
assertEquals(String.format("%s has wrong value",
|
||||
@ -365,9 +376,16 @@ public void testBlockReportIntervalReconfiguration()
|
||||
}
|
||||
}
|
||||
}
|
||||
assertEquals(String.format("expect %s is not configured",
|
||||
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY), null, dn
|
||||
.getConf().get(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY));
|
||||
assertNull(String.format("expect %s is not configured", DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
|
||||
dn.getConf().get(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY));
|
||||
|
||||
dn.reconfigureProperty(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, null);
|
||||
assertNull(String.format("expect %s is not configured", DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY),
|
||||
dn.getConf().get(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY));
|
||||
|
||||
dn.reconfigureProperty(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, null);
|
||||
assertNull(String.format("expect %s is not configured", DFS_BLOCKREPORT_INITIAL_DELAY_KEY),
|
||||
dn.getConf().get(DFS_BLOCKREPORT_INITIAL_DELAY_KEY));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -330,7 +330,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException {
|
||||
final List<String> outs = Lists.newArrayList();
|
||||
final List<String> errs = Lists.newArrayList();
|
||||
getReconfigurableProperties("datanode", address, outs, errs);
|
||||
assertEquals(6, outs.size());
|
||||
assertEquals(8, outs.size());
|
||||
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user