HDFS-16331. Make dfs.blockreport.intervalMsec reconfigurable (#3676)

Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
(cherry picked from commit 52ec65fd10)
This commit is contained in:
litao 2021-12-03 13:12:05 +08:00 committed by Takanobu Asanuma
parent 831c11c47a
commit cdaf4d89f9
5 changed files with 190 additions and 62 deletions

View File

@ -67,6 +67,7 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;
@ -1139,7 +1140,7 @@ static class Scheduler {
private final long heartbeatIntervalMs;
private final long lifelineIntervalMs;
private final long blockReportIntervalMs;
private volatile long blockReportIntervalMs;
private final long outliersReportIntervalMs;
Scheduler(long heartbeatIntervalMs, long lifelineIntervalMs,
@ -1296,6 +1297,15 @@ void setNextBlockReportTime(long nextBlockReportTime) {
this.nextBlockReportTime.getAndSet(nextBlockReportTime);
}
long getBlockReportIntervalMs() {
return this.blockReportIntervalMs;
}
void setBlockReportIntervalMs(long intervalMs) {
Preconditions.checkArgument(intervalMs > 0);
this.blockReportIntervalMs = intervalMs;
}
/**
* Wrapped for testing.
* @return

View File

@ -74,6 +74,7 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.security.SaslPropertiesResolver;
import org.apache.hadoop.util.Preconditions;
import java.util.concurrent.TimeUnit;
@ -105,7 +106,7 @@ public class DNConf {
final long readaheadLength;
final long heartBeatInterval;
private final long lifelineIntervalMs;
final long blockReportInterval;
volatile long blockReportInterval;
final long blockReportSplitThreshold;
final boolean peerStatsEnabled;
final boolean diskStatsEnabled;
@ -474,4 +475,13 @@ public boolean getPmemCacheRecoveryEnabled() {
public long getProcessCommandsThresholdMs() {
return processCommandsThresholdMs;
}
void setBlockReportInterval(long intervalMs) {
Preconditions.checkArgument(intervalMs > 0);
blockReportInterval = intervalMs;
}
public long getBlockReportInterval() {
return blockReportInterval;
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
@ -304,7 +306,8 @@ public class DataNode extends ReconfigurableBase
Collections.unmodifiableList(
Arrays.asList(
DFS_DATANODE_DATA_DIR_KEY,
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY));
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY));
public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
@ -540,78 +543,111 @@ protected Configuration getNewConf() {
public String reconfigurePropertyImpl(String property, String newVal)
throws ReconfigurationException {
switch (property) {
case DFS_DATANODE_DATA_DIR_KEY: {
IOException rootException = null;
case DFS_DATANODE_DATA_DIR_KEY: {
IOException rootException = null;
try {
LOG.info("Reconfiguring {} to {}", property, newVal);
this.refreshVolumes(newVal);
return getConf().get(DFS_DATANODE_DATA_DIR_KEY);
} catch (IOException e) {
rootException = e;
} finally {
// Send a full block report to let NN acknowledge the volume changes.
try {
LOG.info("Reconfiguring {} to {}", property, newVal);
this.refreshVolumes(newVal);
return getConf().get(DFS_DATANODE_DATA_DIR_KEY);
triggerBlockReport(
new BlockReportOptions.Factory().setIncremental(false).build());
} catch (IOException e) {
rootException = e;
LOG.warn("Exception while sending the block report after refreshing"
+ " volumes {} to {}", property, newVal, e);
if (rootException == null) {
rootException = e;
}
} finally {
// Send a full block report to let NN acknowledge the volume changes.
try {
triggerBlockReport(
new BlockReportOptions.Factory().setIncremental(false).build());
} catch (IOException e) {
LOG.warn("Exception while sending the block report after refreshing"
+ " volumes {} to {}", property, newVal, e);
if (rootException == null) {
rootException = e;
}
} finally {
if (rootException != null) {
throw new ReconfigurationException(property, newVal,
getConf().get(property), rootException);
}
if (rootException != null) {
throw new ReconfigurationException(property, newVal,
getConf().get(property), rootException);
}
}
break;
}
case DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY: {
ReconfigurationException rootException = null;
try {
LOG.info("Reconfiguring {} to {}", property, newVal);
int movers;
if (newVal == null) {
// set to default
movers = DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
} else {
movers = Integer.parseInt(newVal);
if (movers <= 0) {
rootException = new ReconfigurationException(
property,
newVal,
getConf().get(property),
new IllegalArgumentException(
"balancer max concurrent movers must be larger than 0"));
}
}
boolean success = xserver.updateBalancerMaxConcurrentMovers(movers);
if (!success) {
break;
}
case DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY: {
ReconfigurationException rootException = null;
try {
LOG.info("Reconfiguring {} to {}", property, newVal);
int movers;
if (newVal == null) {
// set to default
movers = DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
} else {
movers = Integer.parseInt(newVal);
if (movers <= 0) {
rootException = new ReconfigurationException(
property,
newVal,
getConf().get(property),
new IllegalArgumentException(
"Could not modify concurrent moves thread count"));
}
return Integer.toString(movers);
} catch (NumberFormatException nfe) {
rootException = new ReconfigurationException(
property, newVal, getConf().get(property), nfe);
} finally {
if (rootException != null) {
LOG.warn(String.format(
"Exception in updating balancer max concurrent movers %s to %s",
property, newVal), rootException);
throw rootException;
"balancer max concurrent movers must be larger than 0"));
}
}
break;
boolean success = xserver.updateBalancerMaxConcurrentMovers(movers);
if (!success) {
rootException = new ReconfigurationException(
property,
newVal,
getConf().get(property),
new IllegalArgumentException(
"Could not modify concurrent moves thread count"));
}
return Integer.toString(movers);
} catch (NumberFormatException nfe) {
rootException = new ReconfigurationException(
property, newVal, getConf().get(property), nfe);
} finally {
if (rootException != null) {
LOG.warn(String.format(
"Exception in updating balancer max concurrent movers %s to %s",
property, newVal), rootException);
throw rootException;
}
}
default:
break;
break;
}
case DFS_BLOCKREPORT_INTERVAL_MSEC_KEY: {
ReconfigurationException rootException = null;
try {
LOG.info("Reconfiguring {} to {}", property, newVal);
long intervalMs;
if (newVal == null) {
// Set to default.
intervalMs = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
} else {
intervalMs = Long.parseLong(newVal);
}
dnConf.setBlockReportInterval(intervalMs);
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
if (bpos != null) {
for (BPServiceActor actor : bpos.getBPServiceActors()) {
actor.getScheduler().setBlockReportIntervalMs(intervalMs);
}
}
}
return Long.toString(intervalMs);
} catch (IllegalArgumentException e) {
rootException = new ReconfigurationException(
property, newVal, getConf().get(property), e);
} finally {
if (rootException != null) {
LOG.warn(String.format(
"Exception in updating block report interval %s to %s",
property, newVal), rootException);
throw rootException;
}
}
break;
}
default:
break;
}
throw new ReconfigurationException(
property, newVal, getConf().get(property));

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
import static org.junit.Assert.assertEquals;
@ -293,4 +295,74 @@ private void testAcquireOnMaxConcurrentMoversReconfiguration(
assertEquals("should not be able to get thread quota", false,
dataNode.xserver.balanceThrottler.acquire());
}
@Test
public void testBlockReportIntervalReconfiguration()
throws ReconfigurationException, IOException {
int blockReportInterval = 300 * 1000;
for (int i = 0; i < NUM_DATA_NODE; i++) {
DataNode dn = cluster.getDataNodes().get(i);
// Try invalid values.
try {
dn.reconfigureProperty(
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, "text");
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting NumberFormatException",
expected.getCause() instanceof NumberFormatException);
}
try {
dn.reconfigureProperty(
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
String.valueOf(-1));
fail("ReconfigurationException expected");
} catch (ReconfigurationException expected) {
assertTrue("expecting IllegalArgumentException",
expected.getCause() instanceof IllegalArgumentException);
}
// Change properties.
dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
String.valueOf(blockReportInterval));
// Verify change.
assertEquals(String.format("%s has wrong value",
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
blockReportInterval,
dn.getDnConf().getBlockReportInterval());
for (BPOfferService bpos : dn.getAllBpOs()) {
if (bpos != null) {
for (BPServiceActor actor : bpos.getBPServiceActors()) {
assertEquals(String.format("%s has wrong value",
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
blockReportInterval,
actor.getScheduler().getBlockReportIntervalMs());
}
}
}
// Revert to default.
dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
null);
assertEquals(String.format("%s has wrong value",
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT,
dn.getDnConf().getBlockReportInterval());
// Verify default.
for (BPOfferService bpos : dn.getAllBpOs()) {
if (bpos != null) {
for (BPServiceActor actor : bpos.getBPServiceActors()) {
assertEquals(String.format("%s has wrong value",
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY),
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT,
actor.getScheduler().getBlockReportIntervalMs());
}
}
}
assertEquals(String.format("expect %s is not configured",
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY), null, dn
.getConf().get(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY));
}
}
}

View File

@ -330,7 +330,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException {
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("datanode", address, outs, errs);
assertEquals(3, outs.size());
assertEquals(4, outs.size());
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
}