HDFS-16413. Reconfig dfs usage parameters for datanode (#3863)
This commit is contained in:
parent
dc4a680da8
commit
ac50657c37
@ -19,6 +19,7 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.classification.VisibleForTesting;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -154,10 +155,20 @@ boolean running() {
|
|||||||
/**
|
/**
|
||||||
* How long in between runs of the background refresh.
|
* How long in between runs of the background refresh.
|
||||||
*/
|
*/
|
||||||
long getRefreshInterval() {
|
@VisibleForTesting
|
||||||
|
public long getRefreshInterval() {
|
||||||
return refreshInterval;
|
return refreshInterval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Randomize the refresh interval timing by this amount, the actual interval will be chosen
|
||||||
|
* uniformly between {@code interval-jitter} and {@code interval+jitter}.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getJitter() {
|
||||||
|
return jitter;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reset the current used data amount. This should be called
|
* Reset the current used data amount. This should be called
|
||||||
* when the cached value is re-computed.
|
* when the cached value is re-computed.
|
||||||
|
@ -20,6 +20,10 @@
|
|||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
|
||||||
@ -149,6 +153,8 @@
|
|||||||
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
|
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker;
|
import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker;
|
import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.BlockPoolSlice;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
||||||
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
||||||
import org.apache.hadoop.util.*;
|
import org.apache.hadoop.util.*;
|
||||||
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
||||||
@ -341,7 +347,9 @@ public class DataNode extends ReconfigurableBase
|
|||||||
DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY,
|
DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY,
|
||||||
DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY,
|
DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY,
|
||||||
DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY,
|
DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY,
|
||||||
DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY));
|
DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY,
|
||||||
|
FS_DU_INTERVAL_KEY,
|
||||||
|
FS_GETSPACEUSED_JITTER_KEY));
|
||||||
|
|
||||||
public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
|
public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
|
||||||
|
|
||||||
@ -673,6 +681,9 @@ public String reconfigurePropertyImpl(String property, String newVal)
|
|||||||
case DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY:
|
case DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY:
|
||||||
case DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY:
|
case DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY:
|
||||||
return reconfSlowDiskParameters(property, newVal);
|
return reconfSlowDiskParameters(property, newVal);
|
||||||
|
case FS_DU_INTERVAL_KEY:
|
||||||
|
case FS_GETSPACEUSED_JITTER_KEY:
|
||||||
|
return reconfDfsUsageParameters(property, newVal);
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -854,6 +865,43 @@ private String reconfSlowDiskParameters(String property, String newVal)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String reconfDfsUsageParameters(String property, String newVal)
|
||||||
|
throws ReconfigurationException {
|
||||||
|
String result = null;
|
||||||
|
try {
|
||||||
|
LOG.info("Reconfiguring {} to {}", property, newVal);
|
||||||
|
if (property.equals(FS_DU_INTERVAL_KEY)) {
|
||||||
|
Preconditions.checkNotNull(data, "FsDatasetSpi has not been initialized.");
|
||||||
|
long interval = (newVal == null ? FS_DU_INTERVAL_DEFAULT :
|
||||||
|
Long.parseLong(newVal));
|
||||||
|
result = Long.toString(interval);
|
||||||
|
List<FsVolumeImpl> volumeList = data.getVolumeList();
|
||||||
|
for (FsVolumeImpl fsVolume : volumeList) {
|
||||||
|
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
|
||||||
|
for (BlockPoolSlice value : blockPoolSlices.values()) {
|
||||||
|
value.updateDfsUsageConfig(interval, null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (property.equals(FS_GETSPACEUSED_JITTER_KEY)) {
|
||||||
|
Preconditions.checkNotNull(data, "FsDatasetSpi has not been initialized.");
|
||||||
|
long jitter = (newVal == null ? FS_GETSPACEUSED_JITTER_DEFAULT :
|
||||||
|
Long.parseLong(newVal));
|
||||||
|
result = Long.toString(jitter);
|
||||||
|
List<FsVolumeImpl> volumeList = data.getVolumeList();
|
||||||
|
for (FsVolumeImpl fsVolume : volumeList) {
|
||||||
|
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
|
||||||
|
for (BlockPoolSlice value : blockPoolSlices.values()) {
|
||||||
|
value.updateDfsUsageConfig(null, jitter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
|
||||||
|
return result;
|
||||||
|
} catch (IllegalArgumentException | IOException e) {
|
||||||
|
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a list of the keys of the re-configurable properties in configuration.
|
* Get a list of the keys of the re-configurable properties in configuration.
|
||||||
*/
|
*/
|
||||||
|
@ -37,6 +37,7 @@
|
|||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock;
|
import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock;
|
||||||
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager;
|
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MountVolumeMap;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MountVolumeMap;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
@ -679,4 +680,9 @@ ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block,
|
|||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
MountVolumeMap getMountVolumeMap() throws IOException;
|
MountVolumeMap getMountVolumeMap() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the volume list.
|
||||||
|
*/
|
||||||
|
List<FsVolumeImpl> getVolumeList();
|
||||||
}
|
}
|
||||||
|
@ -46,6 +46,7 @@
|
|||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed;
|
import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed;
|
||||||
|
import org.apache.hadoop.util.Preconditions;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@ -77,6 +78,9 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.classification.VisibleForTesting;
|
import org.apache.hadoop.classification.VisibleForTesting;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A block pool slice represents a portion of a block pool stored on a volume.
|
* A block pool slice represents a portion of a block pool stored on a volume.
|
||||||
* Taken together, all BlockPoolSlices sharing a block pool ID across a
|
* Taken together, all BlockPoolSlices sharing a block pool ID across a
|
||||||
@ -84,7 +88,7 @@
|
|||||||
*
|
*
|
||||||
* This class is synchronized by {@link FsVolumeImpl}.
|
* This class is synchronized by {@link FsVolumeImpl}.
|
||||||
*/
|
*/
|
||||||
class BlockPoolSlice {
|
public class BlockPoolSlice {
|
||||||
static final Logger LOG = LoggerFactory.getLogger(BlockPoolSlice.class);
|
static final Logger LOG = LoggerFactory.getLogger(BlockPoolSlice.class);
|
||||||
|
|
||||||
private final String bpid;
|
private final String bpid;
|
||||||
@ -115,6 +119,8 @@ class BlockPoolSlice {
|
|||||||
private final Timer timer;
|
private final Timer timer;
|
||||||
private final int maxDataLength;
|
private final int maxDataLength;
|
||||||
private final FileIoProvider fileIoProvider;
|
private final FileIoProvider fileIoProvider;
|
||||||
|
private final Configuration config;
|
||||||
|
private final File bpDir;
|
||||||
|
|
||||||
private static ForkJoinPool addReplicaThreadPool = null;
|
private static ForkJoinPool addReplicaThreadPool = null;
|
||||||
private static final int VOLUMES_REPLICA_ADD_THREADPOOL_SIZE = Runtime
|
private static final int VOLUMES_REPLICA_ADD_THREADPOOL_SIZE = Runtime
|
||||||
@ -128,7 +134,7 @@ public int compare(File f1, File f2) {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// TODO:FEDERATION scalability issue - a thread per DU is needed
|
// TODO:FEDERATION scalability issue - a thread per DU is needed
|
||||||
private final GetSpaceUsed dfsUsage;
|
private volatile GetSpaceUsed dfsUsage;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a blook pool slice
|
* Create a blook pool slice
|
||||||
@ -141,6 +147,8 @@ public int compare(File f1, File f2) {
|
|||||||
*/
|
*/
|
||||||
BlockPoolSlice(String bpid, FsVolumeImpl volume, File bpDir,
|
BlockPoolSlice(String bpid, FsVolumeImpl volume, File bpDir,
|
||||||
Configuration conf, Timer timer) throws IOException {
|
Configuration conf, Timer timer) throws IOException {
|
||||||
|
this.config = conf;
|
||||||
|
this.bpDir = bpDir;
|
||||||
this.bpid = bpid;
|
this.bpid = bpid;
|
||||||
this.volume = volume;
|
this.volume = volume;
|
||||||
this.fileIoProvider = volume.getFileIoProvider();
|
this.fileIoProvider = volume.getFileIoProvider();
|
||||||
@ -232,6 +240,35 @@ public void run() {
|
|||||||
SHUTDOWN_HOOK_PRIORITY);
|
SHUTDOWN_HOOK_PRIORITY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void updateDfsUsageConfig(Long interval, Long jitter) throws IOException {
|
||||||
|
// Close the old dfsUsage if it is CachingGetSpaceUsed.
|
||||||
|
if (dfsUsage instanceof CachingGetSpaceUsed) {
|
||||||
|
((CachingGetSpaceUsed) dfsUsage).close();
|
||||||
|
}
|
||||||
|
if (interval != null) {
|
||||||
|
Preconditions.checkArgument(interval > 0,
|
||||||
|
FS_DU_INTERVAL_KEY + " should be larger than 0");
|
||||||
|
config.setLong(FS_DU_INTERVAL_KEY, interval);
|
||||||
|
}
|
||||||
|
if (jitter != null) {
|
||||||
|
Preconditions.checkArgument(jitter >= 0,
|
||||||
|
FS_GETSPACEUSED_JITTER_KEY + " should be larger than or equal to 0");
|
||||||
|
config.setLong(FS_GETSPACEUSED_JITTER_KEY, jitter);
|
||||||
|
}
|
||||||
|
// Start new dfsUsage.
|
||||||
|
this.dfsUsage = new FSCachingGetSpaceUsed.Builder().setBpid(bpid)
|
||||||
|
.setVolume(volume)
|
||||||
|
.setPath(bpDir)
|
||||||
|
.setConf(config)
|
||||||
|
.setInitialUsed(loadDfsUsed())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public GetSpaceUsed getDfsUsage() {
|
||||||
|
return dfsUsage;
|
||||||
|
}
|
||||||
|
|
||||||
private synchronized static void initializeAddReplicaPool(Configuration conf,
|
private synchronized static void initializeAddReplicaPool(Configuration conf,
|
||||||
FsDatasetImpl dataset) {
|
FsDatasetImpl dataset) {
|
||||||
if (addReplicaThreadPool == null) {
|
if (addReplicaThreadPool == null) {
|
||||||
|
@ -3688,5 +3688,10 @@ void stopAllDataxceiverThreads(FsVolumeImpl volume) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<FsVolumeImpl> getVolumeList() {
|
||||||
|
return volumes.getVolumes();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -548,6 +548,10 @@ long getRecentReserved() {
|
|||||||
return recentReserved;
|
return recentReserved;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Map<String, BlockPoolSlice> getBlockPoolSlices() {
|
||||||
|
return bpSlices;
|
||||||
|
}
|
||||||
|
|
||||||
long getReserved(){
|
long getReserved(){
|
||||||
return reserved != null ? reserved.getReserved() : 0;
|
return reserved != null ? reserved.getReserved() : 0;
|
||||||
}
|
}
|
||||||
|
@ -42,6 +42,7 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock;
|
import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock;
|
||||||
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager;
|
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MountVolumeMap;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MountVolumeMap;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.math.LongMath;
|
import org.apache.hadoop.thirdparty.com.google.common.math.LongMath;
|
||||||
import org.apache.commons.lang3.ArrayUtils;
|
import org.apache.commons.lang3.ArrayUtils;
|
||||||
@ -1605,5 +1606,10 @@ public Set<? extends Replica> deepCopyReplica(String bpid)
|
|||||||
public MountVolumeMap getMountVolumeMap() {
|
public MountVolumeMap getMountVolumeMap() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<FsVolumeImpl> getVolumeList() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,6 +19,10 @@
|
|||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_DEFAULT;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
|
||||||
@ -49,15 +53,21 @@
|
|||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.ReconfigurationException;
|
import org.apache.hadoop.conf.ReconfigurationException;
|
||||||
|
import org.apache.hadoop.fs.CachingGetSpaceUsed;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.fs.GetSpaceUsed;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.BlockPoolSlice;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
||||||
import org.apache.hadoop.test.LambdaTestUtils;
|
import org.apache.hadoop.test.LambdaTestUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
@ -673,4 +683,77 @@ public void testSlowDiskParameters() throws ReconfigurationException, IOExceptio
|
|||||||
dn.getDiskMetrics().getSlowDiskDetector().getLowThresholdMs());
|
dn.getDiskMetrics().getSlowDiskDetector().getLowThresholdMs());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDfsUsageParameters() throws ReconfigurationException {
|
||||||
|
String[] dfsUsageParameters = {
|
||||||
|
FS_DU_INTERVAL_KEY,
|
||||||
|
FS_GETSPACEUSED_JITTER_KEY};
|
||||||
|
|
||||||
|
for (int i = 0; i < NUM_DATA_NODE; i++) {
|
||||||
|
DataNode dn = cluster.getDataNodes().get(i);
|
||||||
|
|
||||||
|
// Try invalid values.
|
||||||
|
for (String parameter : dfsUsageParameters) {
|
||||||
|
try {
|
||||||
|
dn.reconfigureProperty(parameter, "text");
|
||||||
|
fail("ReconfigurationException expected");
|
||||||
|
} catch (ReconfigurationException expected) {
|
||||||
|
assertTrue("expecting NumberFormatException",
|
||||||
|
expected.getCause() instanceof NumberFormatException);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
dn.reconfigureProperty(parameter, String.valueOf(-1));
|
||||||
|
fail("ReconfigurationException expected");
|
||||||
|
} catch (ReconfigurationException expected) {
|
||||||
|
assertTrue("expecting IllegalArgumentException",
|
||||||
|
expected.getCause() instanceof IllegalArgumentException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Change and verify properties.
|
||||||
|
for (String parameter : dfsUsageParameters) {
|
||||||
|
dn.reconfigureProperty(parameter, "99");
|
||||||
|
}
|
||||||
|
List<FsVolumeImpl> volumeList = dn.data.getVolumeList();
|
||||||
|
for (FsVolumeImpl fsVolume : volumeList) {
|
||||||
|
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
|
||||||
|
for (Map.Entry<String, BlockPoolSlice> entry : blockPoolSlices.entrySet()) {
|
||||||
|
GetSpaceUsed dfsUsage = entry.getValue().getDfsUsage();
|
||||||
|
if (dfsUsage instanceof CachingGetSpaceUsed) {
|
||||||
|
assertEquals(99,
|
||||||
|
((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getRefreshInterval());
|
||||||
|
assertEquals(99,
|
||||||
|
((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getJitter());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Revert to default and verify.
|
||||||
|
for (String parameter : dfsUsageParameters) {
|
||||||
|
dn.reconfigureProperty(parameter, null);
|
||||||
|
}
|
||||||
|
for (FsVolumeImpl fsVolume : volumeList) {
|
||||||
|
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
|
||||||
|
for (Map.Entry<String, BlockPoolSlice> entry : blockPoolSlices.entrySet()) {
|
||||||
|
GetSpaceUsed dfsUsage = entry.getValue().getDfsUsage();
|
||||||
|
if (dfsUsage instanceof CachingGetSpaceUsed) {
|
||||||
|
assertEquals(String.format("expect %s is not configured",
|
||||||
|
FS_DU_INTERVAL_KEY), FS_DU_INTERVAL_DEFAULT,
|
||||||
|
((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getRefreshInterval());
|
||||||
|
assertEquals(String.format("expect %s is not configured",
|
||||||
|
FS_GETSPACEUSED_JITTER_KEY), FS_GETSPACEUSED_JITTER_DEFAULT,
|
||||||
|
((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getJitter());
|
||||||
|
}
|
||||||
|
assertEquals(String.format("expect %s is not configured",
|
||||||
|
FS_DU_INTERVAL_KEY), null,
|
||||||
|
dn.getConf().get(FS_DU_INTERVAL_KEY));
|
||||||
|
assertEquals(String.format("expect %s is not configured",
|
||||||
|
FS_GETSPACEUSED_JITTER_KEY), null,
|
||||||
|
dn.getConf().get(FS_GETSPACEUSED_JITTER_KEY));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -25,6 +25,7 @@
|
|||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock;
|
import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock;
|
||||||
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager;
|
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MountVolumeMap;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MountVolumeMap;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
@ -467,4 +468,9 @@ public Set<? extends Replica> deepCopyReplica(String bpid)
|
|||||||
public MountVolumeMap getMountVolumeMap() {
|
public MountVolumeMap getMountVolumeMap() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<FsVolumeImpl> getVolumeList() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -339,7 +339,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException {
|
|||||||
final List<String> outs = Lists.newArrayList();
|
final List<String> outs = Lists.newArrayList();
|
||||||
final List<String> errs = Lists.newArrayList();
|
final List<String> errs = Lists.newArrayList();
|
||||||
getReconfigurableProperties("datanode", address, outs, errs);
|
getReconfigurableProperties("datanode", address, outs, errs);
|
||||||
assertEquals(16, outs.size());
|
assertEquals(18, outs.size());
|
||||||
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
|
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user