diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index d5fb080d72..8f0286c680 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -67,6 +67,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionUtil; @@ -1139,7 +1140,7 @@ static class Scheduler { private final long heartbeatIntervalMs; private final long lifelineIntervalMs; - private final long blockReportIntervalMs; + private volatile long blockReportIntervalMs; private final long outliersReportIntervalMs; Scheduler(long heartbeatIntervalMs, long lifelineIntervalMs, @@ -1296,6 +1297,15 @@ void setNextBlockReportTime(long nextBlockReportTime) { this.nextBlockReportTime.getAndSet(nextBlockReportTime); } + long getBlockReportIntervalMs() { + return this.blockReportIntervalMs; + } + + void setBlockReportIntervalMs(long intervalMs) { + Preconditions.checkArgument(intervalMs > 0); + this.blockReportIntervalMs = intervalMs; + } + /** * Wrapped for testing. * @return diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index d61a17e83f..ae32df5e54 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -74,6 +74,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.security.SaslPropertiesResolver; +import org.apache.hadoop.util.Preconditions; import java.util.concurrent.TimeUnit; @@ -105,7 +106,7 @@ public class DNConf { final long readaheadLength; final long heartBeatInterval; private final long lifelineIntervalMs; - final long blockReportInterval; + volatile long blockReportInterval; final long blockReportSplitThreshold; final boolean peerStatsEnabled; final boolean diskStatsEnabled; @@ -474,4 +475,13 @@ public boolean getPmemCacheRecoveryEnabled() { public long getProcessCommandsThresholdMs() { return processCommandsThresholdMs; } + + void setBlockReportInterval(long intervalMs) { + Preconditions.checkArgument(intervalMs > 0); + blockReportInterval = intervalMs; + } + + public long getBlockReportInterval() { + return blockReportInterval; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index c1507a4512..537a1c4e8c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.datanode; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; @@ -304,7 +306,8 @@ public class DataNode extends ReconfigurableBase Collections.unmodifiableList( Arrays.asList( DFS_DATANODE_DATA_DIR_KEY, - DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY)); + DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + DFS_BLOCKREPORT_INTERVAL_MSEC_KEY)); public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog"); @@ -540,78 +543,111 @@ protected Configuration getNewConf() { public String reconfigurePropertyImpl(String property, String newVal) throws ReconfigurationException { switch (property) { - case DFS_DATANODE_DATA_DIR_KEY: { - IOException rootException = null; + case DFS_DATANODE_DATA_DIR_KEY: { + IOException rootException = null; + try { + LOG.info("Reconfiguring {} to {}", property, newVal); + this.refreshVolumes(newVal); + return getConf().get(DFS_DATANODE_DATA_DIR_KEY); + } catch (IOException e) { + rootException = e; + } finally { + // Send a full block report to let NN acknowledge the volume changes. try { - LOG.info("Reconfiguring {} to {}", property, newVal); - this.refreshVolumes(newVal); - return getConf().get(DFS_DATANODE_DATA_DIR_KEY); + triggerBlockReport( + new BlockReportOptions.Factory().setIncremental(false).build()); } catch (IOException e) { - rootException = e; + LOG.warn("Exception while sending the block report after refreshing" + + " volumes {} to {}", property, newVal, e); + if (rootException == null) { + rootException = e; + } } finally { - // Send a full block report to let NN acknowledge the volume changes. - try { - triggerBlockReport( - new BlockReportOptions.Factory().setIncremental(false).build()); - } catch (IOException e) { - LOG.warn("Exception while sending the block report after refreshing" - + " volumes {} to {}", property, newVal, e); - if (rootException == null) { - rootException = e; - } - } finally { - if (rootException != null) { - throw new ReconfigurationException(property, newVal, - getConf().get(property), rootException); - } + if (rootException != null) { + throw new ReconfigurationException(property, newVal, + getConf().get(property), rootException); } } - break; } - case DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY: { - ReconfigurationException rootException = null; - try { - LOG.info("Reconfiguring {} to {}", property, newVal); - int movers; - if (newVal == null) { - // set to default - movers = DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT; - } else { - movers = Integer.parseInt(newVal); - if (movers <= 0) { - rootException = new ReconfigurationException( - property, - newVal, - getConf().get(property), - new IllegalArgumentException( - "balancer max concurrent movers must be larger than 0")); - } - } - boolean success = xserver.updateBalancerMaxConcurrentMovers(movers); - if (!success) { + break; + } + case DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY: { + ReconfigurationException rootException = null; + try { + LOG.info("Reconfiguring {} to {}", property, newVal); + int movers; + if (newVal == null) { + // set to default + movers = DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT; + } else { + movers = Integer.parseInt(newVal); + if (movers <= 0) { rootException = new ReconfigurationException( property, newVal, getConf().get(property), new IllegalArgumentException( - "Could not modify concurrent moves thread count")); - } - return Integer.toString(movers); - } catch (NumberFormatException nfe) { - rootException = new ReconfigurationException( - property, newVal, getConf().get(property), nfe); - } finally { - if (rootException != null) { - LOG.warn(String.format( - "Exception in updating balancer max concurrent movers %s to %s", - property, newVal), rootException); - throw rootException; + "balancer max concurrent movers must be larger than 0")); } } - break; + boolean success = xserver.updateBalancerMaxConcurrentMovers(movers); + if (!success) { + rootException = new ReconfigurationException( + property, + newVal, + getConf().get(property), + new IllegalArgumentException( + "Could not modify concurrent moves thread count")); + } + return Integer.toString(movers); + } catch (NumberFormatException nfe) { + rootException = new ReconfigurationException( + property, newVal, getConf().get(property), nfe); + } finally { + if (rootException != null) { + LOG.warn(String.format( + "Exception in updating balancer max concurrent movers %s to %s", + property, newVal), rootException); + throw rootException; + } } - default: - break; + break; + } + case DFS_BLOCKREPORT_INTERVAL_MSEC_KEY: { + ReconfigurationException rootException = null; + try { + LOG.info("Reconfiguring {} to {}", property, newVal); + long intervalMs; + if (newVal == null) { + // Set to default. + intervalMs = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT; + } else { + intervalMs = Long.parseLong(newVal); + } + dnConf.setBlockReportInterval(intervalMs); + for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { + if (bpos != null) { + for (BPServiceActor actor : bpos.getBPServiceActors()) { + actor.getScheduler().setBlockReportIntervalMs(intervalMs); + } + } + } + return Long.toString(intervalMs); + } catch (IllegalArgumentException e) { + rootException = new ReconfigurationException( + property, newVal, getConf().get(property), e); + } finally { + if (rootException != null) { + LOG.warn(String.format( + "Exception in updating block report interval %s to %s", + property, newVal), rootException); + throw rootException; + } + } + break; + } + default: + break; } throw new ReconfigurationException( property, newVal, getConf().get(property)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java index 8cbd38bc60..47ac08c3b1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.datanode; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT; import static org.junit.Assert.assertEquals; @@ -293,4 +295,74 @@ private void testAcquireOnMaxConcurrentMoversReconfiguration( assertEquals("should not be able to get thread quota", false, dataNode.xserver.balanceThrottler.acquire()); } + + @Test + public void testBlockReportIntervalReconfiguration() + throws ReconfigurationException, IOException { + int blockReportInterval = 300 * 1000; + for (int i = 0; i < NUM_DATA_NODE; i++) { + DataNode dn = cluster.getDataNodes().get(i); + + // Try invalid values. + try { + dn.reconfigureProperty( + DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, "text"); + fail("ReconfigurationException expected"); + } catch (ReconfigurationException expected) { + assertTrue("expecting NumberFormatException", + expected.getCause() instanceof NumberFormatException); + } + try { + dn.reconfigureProperty( + DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, + String.valueOf(-1)); + fail("ReconfigurationException expected"); + } catch (ReconfigurationException expected) { + assertTrue("expecting IllegalArgumentException", + expected.getCause() instanceof IllegalArgumentException); + } + + // Change properties. + dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, + String.valueOf(blockReportInterval)); + + // Verify change. + assertEquals(String.format("%s has wrong value", + DFS_BLOCKREPORT_INTERVAL_MSEC_KEY), + blockReportInterval, + dn.getDnConf().getBlockReportInterval()); + for (BPOfferService bpos : dn.getAllBpOs()) { + if (bpos != null) { + for (BPServiceActor actor : bpos.getBPServiceActors()) { + assertEquals(String.format("%s has wrong value", + DFS_BLOCKREPORT_INTERVAL_MSEC_KEY), + blockReportInterval, + actor.getScheduler().getBlockReportIntervalMs()); + } + } + } + + // Revert to default. + dn.reconfigureProperty(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, + null); + assertEquals(String.format("%s has wrong value", + DFS_BLOCKREPORT_INTERVAL_MSEC_KEY), + DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT, + dn.getDnConf().getBlockReportInterval()); + // Verify default. + for (BPOfferService bpos : dn.getAllBpOs()) { + if (bpos != null) { + for (BPServiceActor actor : bpos.getBPServiceActors()) { + assertEquals(String.format("%s has wrong value", + DFS_BLOCKREPORT_INTERVAL_MSEC_KEY), + DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT, + actor.getScheduler().getBlockReportIntervalMs()); + } + } + } + assertEquals(String.format("expect %s is not configured", + DFS_BLOCKREPORT_INTERVAL_MSEC_KEY), null, dn + .getConf().get(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY)); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index 25a46ea105..27943b7db6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -330,7 +330,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException { final List outs = Lists.newArrayList(); final List errs = Lists.newArrayList(); getReconfigurableProperties("datanode", address, outs, errs); - assertEquals(3, outs.size()); + assertEquals(4, outs.size()); assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1)); }