diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java index a437995885..362d125b09 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java @@ -19,6 +19,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -154,10 +155,20 @@ boolean running() { /** * How long in between runs of the background refresh. */ - long getRefreshInterval() { + @VisibleForTesting + public long getRefreshInterval() { return refreshInterval; } + /** + * Randomize the refresh interval timing by this amount, the actual interval will be chosen + * uniformly between {@code interval-jitter} and {@code interval+jitter}. + */ + @VisibleForTesting + public long getJitter() { + return jitter; + } + /** * Reset the current used data amount. This should be called * when the cached value is re-computed. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 15e8a9e359..c3b1aa1c67 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -20,6 +20,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT; @@ -149,6 +153,8 @@ import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel; import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker; import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.BlockPoolSlice; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.util.*; import org.apache.hadoop.hdfs.client.BlockReportOptions; @@ -341,7 +347,9 @@ public class DataNode extends ReconfigurableBase DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY, DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY, DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY, - DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY)); + DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY, + FS_DU_INTERVAL_KEY, + FS_GETSPACEUSED_JITTER_KEY)); public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog"); @@ -673,6 +681,9 @@ public String reconfigurePropertyImpl(String property, String newVal) case DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY: case DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY: return reconfSlowDiskParameters(property, newVal); + case FS_DU_INTERVAL_KEY: + case FS_GETSPACEUSED_JITTER_KEY: + return reconfDfsUsageParameters(property, newVal); default: break; } @@ -854,6 +865,43 @@ private String reconfSlowDiskParameters(String property, String newVal) } } + private String reconfDfsUsageParameters(String property, String newVal) + throws ReconfigurationException { + String result = null; + try { + LOG.info("Reconfiguring {} to {}", property, newVal); + if (property.equals(FS_DU_INTERVAL_KEY)) { + Preconditions.checkNotNull(data, "FsDatasetSpi has not been initialized."); + long interval = (newVal == null ? FS_DU_INTERVAL_DEFAULT : + Long.parseLong(newVal)); + result = Long.toString(interval); + List volumeList = data.getVolumeList(); + for (FsVolumeImpl fsVolume : volumeList) { + Map blockPoolSlices = fsVolume.getBlockPoolSlices(); + for (BlockPoolSlice value : blockPoolSlices.values()) { + value.updateDfsUsageConfig(interval, null); + } + } + } else if (property.equals(FS_GETSPACEUSED_JITTER_KEY)) { + Preconditions.checkNotNull(data, "FsDatasetSpi has not been initialized."); + long jitter = (newVal == null ? FS_GETSPACEUSED_JITTER_DEFAULT : + Long.parseLong(newVal)); + result = Long.toString(jitter); + List volumeList = data.getVolumeList(); + for (FsVolumeImpl fsVolume : volumeList) { + Map blockPoolSlices = fsVolume.getBlockPoolSlices(); + for (BlockPoolSlice value : blockPoolSlices.values()) { + value.updateDfsUsageConfig(null, jitter); + } + } + } + LOG.info("RECONFIGURE* changed {} to {}", property, newVal); + return result; + } catch (IllegalArgumentException | IOException e) { + throw new ReconfigurationException(property, newVal, getConf().get(property), e); + } + } + /** * Get a list of the keys of the re-configurable properties in configuration. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index e39ef817b6..8d1d10bccd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock; import org.apache.hadoop.hdfs.server.common.DataNodeLockManager; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MountVolumeMap; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; @@ -679,4 +680,9 @@ ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block, * @throws IOException */ MountVolumeMap getMountVolumeMap() throws IOException; + + /** + * Get the volume list. + */ + List getVolumeList(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index eff079a353..8357b860b2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -46,6 +46,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed; +import org.apache.hadoop.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -77,6 +78,9 @@ import org.apache.hadoop.classification.VisibleForTesting; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY; + /** * A block pool slice represents a portion of a block pool stored on a volume. * Taken together, all BlockPoolSlices sharing a block pool ID across a @@ -84,7 +88,7 @@ * * This class is synchronized by {@link FsVolumeImpl}. */ -class BlockPoolSlice { +public class BlockPoolSlice { static final Logger LOG = LoggerFactory.getLogger(BlockPoolSlice.class); private final String bpid; @@ -115,6 +119,8 @@ class BlockPoolSlice { private final Timer timer; private final int maxDataLength; private final FileIoProvider fileIoProvider; + private final Configuration config; + private final File bpDir; private static ForkJoinPool addReplicaThreadPool = null; private static final int VOLUMES_REPLICA_ADD_THREADPOOL_SIZE = Runtime @@ -128,7 +134,7 @@ public int compare(File f1, File f2) { }; // TODO:FEDERATION scalability issue - a thread per DU is needed - private final GetSpaceUsed dfsUsage; + private volatile GetSpaceUsed dfsUsage; /** * Create a blook pool slice @@ -141,6 +147,8 @@ public int compare(File f1, File f2) { */ BlockPoolSlice(String bpid, FsVolumeImpl volume, File bpDir, Configuration conf, Timer timer) throws IOException { + this.config = conf; + this.bpDir = bpDir; this.bpid = bpid; this.volume = volume; this.fileIoProvider = volume.getFileIoProvider(); @@ -232,6 +240,35 @@ public void run() { SHUTDOWN_HOOK_PRIORITY); } + public void updateDfsUsageConfig(Long interval, Long jitter) throws IOException { + // Close the old dfsUsage if it is CachingGetSpaceUsed. + if (dfsUsage instanceof CachingGetSpaceUsed) { + ((CachingGetSpaceUsed) dfsUsage).close(); + } + if (interval != null) { + Preconditions.checkArgument(interval > 0, + FS_DU_INTERVAL_KEY + " should be larger than 0"); + config.setLong(FS_DU_INTERVAL_KEY, interval); + } + if (jitter != null) { + Preconditions.checkArgument(jitter >= 0, + FS_GETSPACEUSED_JITTER_KEY + " should be larger than or equal to 0"); + config.setLong(FS_GETSPACEUSED_JITTER_KEY, jitter); + } + // Start new dfsUsage. + this.dfsUsage = new FSCachingGetSpaceUsed.Builder().setBpid(bpid) + .setVolume(volume) + .setPath(bpDir) + .setConf(config) + .setInitialUsed(loadDfsUsed()) + .build(); + } + + @VisibleForTesting + public GetSpaceUsed getDfsUsage() { + return dfsUsage; + } + private synchronized static void initializeAddReplicaPool(Configuration conf, FsDatasetImpl dataset) { if (addReplicaThreadPool == null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 002d99abc5..aaf37aa09c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -3688,5 +3688,10 @@ void stopAllDataxceiverThreads(FsVolumeImpl volume) { } } } + + @Override + public List getVolumeList() { + return volumes.getVolumes(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 8f15d8a709..806afbdb2d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -548,6 +548,10 @@ long getRecentReserved() { return recentReserved; } + public Map getBlockPoolSlices() { + return bpSlices; + } + long getReserved(){ return reserved != null ? reserved.getReserved() : 0; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 3313c7c7a0..29eb051cb0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock; import org.apache.hadoop.hdfs.server.common.DataNodeLockManager; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MountVolumeMap; import org.apache.hadoop.thirdparty.com.google.common.math.LongMath; import org.apache.commons.lang3.ArrayUtils; @@ -1605,5 +1606,10 @@ public Set deepCopyReplica(String bpid) public MountVolumeMap getMountVolumeMap() { return null; } + + @Override + public List getVolumeList() { + return null; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java index 1a9d6024ac..172a44557c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java @@ -19,6 +19,10 @@ package org.apache.hadoop.hdfs.server.datanode; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY; @@ -49,15 +53,21 @@ import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.ReconfigurationException; +import org.apache.hadoop.fs.CachingGetSpaceUsed; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.GetSpaceUsed; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.BlockPoolSlice; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.test.LambdaTestUtils; import org.junit.After; import org.junit.Assert; @@ -673,4 +683,77 @@ public void testSlowDiskParameters() throws ReconfigurationException, IOExceptio dn.getDiskMetrics().getSlowDiskDetector().getLowThresholdMs()); } } + + @Test + public void testDfsUsageParameters() throws ReconfigurationException { + String[] dfsUsageParameters = { + FS_DU_INTERVAL_KEY, + FS_GETSPACEUSED_JITTER_KEY}; + + for (int i = 0; i < NUM_DATA_NODE; i++) { + DataNode dn = cluster.getDataNodes().get(i); + + // Try invalid values. + for (String parameter : dfsUsageParameters) { + try { + dn.reconfigureProperty(parameter, "text"); + fail("ReconfigurationException expected"); + } catch (ReconfigurationException expected) { + assertTrue("expecting NumberFormatException", + expected.getCause() instanceof NumberFormatException); + } + + try { + dn.reconfigureProperty(parameter, String.valueOf(-1)); + fail("ReconfigurationException expected"); + } catch (ReconfigurationException expected) { + assertTrue("expecting IllegalArgumentException", + expected.getCause() instanceof IllegalArgumentException); + } + } + + // Change and verify properties. + for (String parameter : dfsUsageParameters) { + dn.reconfigureProperty(parameter, "99"); + } + List volumeList = dn.data.getVolumeList(); + for (FsVolumeImpl fsVolume : volumeList) { + Map blockPoolSlices = fsVolume.getBlockPoolSlices(); + for (Map.Entry entry : blockPoolSlices.entrySet()) { + GetSpaceUsed dfsUsage = entry.getValue().getDfsUsage(); + if (dfsUsage instanceof CachingGetSpaceUsed) { + assertEquals(99, + ((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getRefreshInterval()); + assertEquals(99, + ((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getJitter()); + } + } + } + + // Revert to default and verify. + for (String parameter : dfsUsageParameters) { + dn.reconfigureProperty(parameter, null); + } + for (FsVolumeImpl fsVolume : volumeList) { + Map blockPoolSlices = fsVolume.getBlockPoolSlices(); + for (Map.Entry entry : blockPoolSlices.entrySet()) { + GetSpaceUsed dfsUsage = entry.getValue().getDfsUsage(); + if (dfsUsage instanceof CachingGetSpaceUsed) { + assertEquals(String.format("expect %s is not configured", + FS_DU_INTERVAL_KEY), FS_DU_INTERVAL_DEFAULT, + ((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getRefreshInterval()); + assertEquals(String.format("expect %s is not configured", + FS_GETSPACEUSED_JITTER_KEY), FS_GETSPACEUSED_JITTER_DEFAULT, + ((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getJitter()); + } + assertEquals(String.format("expect %s is not configured", + FS_DU_INTERVAL_KEY), null, + dn.getConf().get(FS_DU_INTERVAL_KEY)); + assertEquals(String.format("expect %s is not configured", + FS_GETSPACEUSED_JITTER_KEY), null, + dn.getConf().get(FS_GETSPACEUSED_JITTER_KEY)); + } + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 1c6597eb45..77e2e2077d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock; import org.apache.hadoop.hdfs.server.common.DataNodeLockManager; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MountVolumeMap; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; @@ -467,4 +468,9 @@ public Set deepCopyReplica(String bpid) public MountVolumeMap getMountVolumeMap() { return null; } + + @Override + public List getVolumeList() { + return null; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index 0f8c4cdc8a..648fb854e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -339,7 +339,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException { final List outs = Lists.newArrayList(); final List errs = Lists.newArrayList(); getReconfigurableProperties("datanode", address, outs, errs); - assertEquals(16, outs.size()); + assertEquals(18, outs.size()); assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1)); }