HDFS-15683. Allow configuring DISK/ARCHIVE capacity for individual volumes. (#2625)
This commit is contained in:
parent
19ae0faacc
commit
0e2b3086e3
@ -1557,6 +1557,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
public static final double
|
||||
DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE_DEFAULT = 0.0;
|
||||
|
||||
public static final String
|
||||
DFS_DATANODE_SAME_DISK_TIERING_CAPACITY_RATIO_PERCENTAGE =
|
||||
"dfs.datanode.same-disk-tiering.capacity-ratio.percentage";
|
||||
public static final String
|
||||
DFS_DATANODE_SAME_DISK_TIERING_CAPACITY_RATIO_PERCENTAGE_DEFAULT = "";
|
||||
|
||||
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
|
||||
@Deprecated
|
||||
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
|
||||
|
@ -20,6 +20,8 @@
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY;
|
||||
@ -739,9 +741,51 @@ ChangedVolumes parseChangedVolumes(String newVolumes) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
validateVolumesWithSameDiskTiering(results);
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check conflict with same disk tiering feature
|
||||
* and throws exception.
|
||||
*
|
||||
* TODO: We can add feature to
|
||||
* allow refreshing volume with capacity ratio,
|
||||
* and solve the case of replacing volume on same mount.
|
||||
*/
|
||||
private void validateVolumesWithSameDiskTiering(ChangedVolumes
|
||||
changedVolumes) throws IOException {
|
||||
if (dnConf.getConf().getBoolean(DFS_DATANODE_ALLOW_SAME_DISK_TIERING,
|
||||
DFS_DATANODE_ALLOW_SAME_DISK_TIERING_DEFAULT)
|
||||
&& data.getMountVolumeMap() != null) {
|
||||
// Check if mount already exist.
|
||||
for (StorageLocation location : changedVolumes.newLocations) {
|
||||
if (StorageType.allowSameDiskTiering(location.getStorageType())) {
|
||||
File dir = new File(location.getUri());
|
||||
// Get the first parent dir that exists to check disk mount point.
|
||||
while (!dir.exists()) {
|
||||
dir = dir.getParentFile();
|
||||
if (dir == null) {
|
||||
throw new IOException("Invalid path: "
|
||||
+ location + ": directory does not exist");
|
||||
}
|
||||
}
|
||||
DF df = new DF(dir, dnConf.getConf());
|
||||
String mount = df.getMount();
|
||||
if (data.getMountVolumeMap().hasMount(mount)) {
|
||||
String errMsg = "Disk mount " + mount
|
||||
+ " already has volume, when trying to add "
|
||||
+ location + ". Please try removing mounts first"
|
||||
+ " or restart datanode.";
|
||||
LOG.error(errMsg);
|
||||
throw new IOException(errMsg);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to reload data volumes with new configuration.
|
||||
* @param newVolumes a comma separated string that specifies the data volumes.
|
||||
|
@ -18,6 +18,8 @@
|
||||
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import java.io.File;
|
||||
@ -58,7 +60,16 @@ public class StorageLocation
|
||||
/** Regular expression that describes a storage uri with a storage type.
|
||||
* e.g. [Disk]/storages/storage1/
|
||||
*/
|
||||
private static final Pattern regex = Pattern.compile("^\\[(\\w*)\\](.+)$");
|
||||
private static final Pattern STORAGE_LOCATION_REGEX =
|
||||
Pattern.compile("^\\[(\\w*)\\](.+)$");
|
||||
|
||||
/** Regular expression for the capacity ratio of a storage volume (uri).
|
||||
* This is useful when configuring multiple
|
||||
* storage types on same disk mount (same-disk-tiering).
|
||||
* e.g. [0.3]/disk1/archive/
|
||||
*/
|
||||
private static final Pattern CAPACITY_RATIO_REGEX =
|
||||
Pattern.compile("^\\[([0-9.]*)\\](.+)$");
|
||||
|
||||
private StorageLocation(StorageType storageType, URI uri) {
|
||||
this.storageType = storageType;
|
||||
@ -127,7 +138,7 @@ public boolean matchesStorageDirectory(StorageDirectory sd,
|
||||
*/
|
||||
public static StorageLocation parse(String rawLocation)
|
||||
throws IOException, SecurityException {
|
||||
Matcher matcher = regex.matcher(rawLocation);
|
||||
Matcher matcher = STORAGE_LOCATION_REGEX.matcher(rawLocation);
|
||||
StorageType storageType = StorageType.DEFAULT;
|
||||
String location = rawLocation;
|
||||
|
||||
@ -144,6 +155,44 @@ public static StorageLocation parse(String rawLocation)
|
||||
return new StorageLocation(storageType, new Path(location).toUri());
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to parse the storage capacity ratio and related volume directory
|
||||
* out of the capacity ratio config string.
|
||||
*
|
||||
* @param capacityRatioConf Config string of the capacity ratio
|
||||
* @return Map of URI of the volume and capacity ratio.
|
||||
* @throws SecurityException when format is incorrect or ratio is not
|
||||
* between 0 - 1.
|
||||
*/
|
||||
public static Map<URI, Double> parseCapacityRatio(String capacityRatioConf)
|
||||
throws SecurityException {
|
||||
Map<URI, Double> result = new HashMap<>();
|
||||
capacityRatioConf = capacityRatioConf.replaceAll("\\s", "");
|
||||
if (capacityRatioConf.isEmpty()) {
|
||||
return result;
|
||||
}
|
||||
String[] capacityRatios = capacityRatioConf.split(",");
|
||||
for (String ratio : capacityRatios) {
|
||||
Matcher matcher = CAPACITY_RATIO_REGEX.matcher(ratio);
|
||||
if (matcher.matches()) {
|
||||
String capacityString = matcher.group(1).trim();
|
||||
String location = matcher.group(2).trim();
|
||||
double capacityRatio = Double.parseDouble(capacityString);
|
||||
if (capacityRatio > 1 || capacityRatio < 0) {
|
||||
throw new IllegalArgumentException("Capacity ratio" + capacityRatio
|
||||
+ " is not between 0 to 1: " + ratio);
|
||||
}
|
||||
result.put(new Path(location).toUri(), capacityRatio);
|
||||
} else {
|
||||
throw new IllegalArgumentException(
|
||||
"Capacity ratio config is not with correct format: "
|
||||
+ capacityRatioConf
|
||||
);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "[" + storageType + "]" + baseURI.normalize();
|
||||
|
@ -35,6 +35,7 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MountVolumeMap;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
@ -680,4 +681,11 @@ ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block,
|
||||
* @throws IOException
|
||||
*/
|
||||
Set<? extends Replica> deepCopyReplica(String bpid) throws IOException;
|
||||
|
||||
/**
|
||||
* Get relationship between disk mount and FsVolume.
|
||||
* @return Disk mount and FsVolume relationship.
|
||||
* @throws IOException
|
||||
*/
|
||||
MountVolumeMap getMountVolumeMap() throws IOException;
|
||||
}
|
||||
|
@ -193,10 +193,6 @@ public FsVolumeImpl getVolume(final ExtendedBlock b) {
|
||||
}
|
||||
}
|
||||
|
||||
MountVolumeMap getMountVolumeMap() {
|
||||
return volumes.getMountVolumeMap();
|
||||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public Block getStoredBlock(String bpid, long blkid)
|
||||
throws IOException {
|
||||
@ -249,7 +245,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
|
||||
}
|
||||
return info.getMetadataInputStream(0);
|
||||
}
|
||||
|
||||
|
||||
final DataNode datanode;
|
||||
private final DataNodeMetrics dataNodeMetrics;
|
||||
final DataStorage dataStorage;
|
||||
@ -3524,7 +3520,12 @@ public boolean getPinning(ExtendedBlock block) throws IOException {
|
||||
ReplicaInfo r = getBlockReplica(block);
|
||||
return r.getPinning(localFS);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public MountVolumeMap getMountVolumeMap() {
|
||||
return volumes.getMountVolumeMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDeletingBlock(String bpid, long blockId) {
|
||||
synchronized(deletingBlock) {
|
||||
|
@ -18,6 +18,7 @@
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
@ -65,6 +66,7 @@ class FsVolumeList {
|
||||
|
||||
private final boolean enableSameDiskTiering;
|
||||
private final MountVolumeMap mountVolumeMap;
|
||||
private Map<URI, Double> capacityRatioMap;
|
||||
|
||||
FsVolumeList(List<VolumeFailureInfo> initialVolumeFailureInfos,
|
||||
BlockScanner blockScanner,
|
||||
@ -82,6 +84,7 @@ class FsVolumeList {
|
||||
DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING,
|
||||
DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING_DEFAULT);
|
||||
mountVolumeMap = new MountVolumeMap(config);
|
||||
initializeCapacityRatio(config);
|
||||
}
|
||||
|
||||
MountVolumeMap getMountVolumeMap() {
|
||||
@ -135,6 +138,20 @@ FsVolumeReference getVolumeByMount(StorageType storageType,
|
||||
return null;
|
||||
}
|
||||
|
||||
private void initializeCapacityRatio(Configuration config) {
|
||||
if (capacityRatioMap == null) {
|
||||
String capacityRatioConfig = config.get(
|
||||
DFSConfigKeys
|
||||
.DFS_DATANODE_SAME_DISK_TIERING_CAPACITY_RATIO_PERCENTAGE,
|
||||
DFSConfigKeys
|
||||
.DFS_DATANODE_SAME_DISK_TIERING_CAPACITY_RATIO_PERCENTAGE_DEFAULT
|
||||
);
|
||||
|
||||
this.capacityRatioMap = StorageLocation
|
||||
.parseCapacityRatio(capacityRatioConfig);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get next volume.
|
||||
*
|
||||
@ -325,11 +342,15 @@ public String toString() {
|
||||
*
|
||||
* @param ref a reference to the new FsVolumeImpl instance.
|
||||
*/
|
||||
void addVolume(FsVolumeReference ref) {
|
||||
void addVolume(FsVolumeReference ref) throws IOException {
|
||||
FsVolumeImpl volume = (FsVolumeImpl) ref.getVolume();
|
||||
volumes.add(volume);
|
||||
if (isSameDiskTieringApplied(volume)) {
|
||||
mountVolumeMap.addVolume(volume);
|
||||
URI uri = volume.getStorageLocation().getUri();
|
||||
if (capacityRatioMap.containsKey(uri)) {
|
||||
mountVolumeMap.setCapacityRatio(volume, capacityRatioMap.get(uri));
|
||||
}
|
||||
}
|
||||
if (blockScanner != null) {
|
||||
blockScanner.addVolumeScanner(ref);
|
||||
|
@ -24,8 +24,8 @@
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
||||
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.EnumMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* MountVolumeInfo is a wrapper of
|
||||
@ -33,12 +33,15 @@
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class MountVolumeInfo {
|
||||
private final ConcurrentMap<StorageType, FsVolumeImpl>
|
||||
private final EnumMap<StorageType, FsVolumeImpl>
|
||||
storageTypeVolumeMap;
|
||||
private final EnumMap<StorageType, Double>
|
||||
capacityRatioMap;
|
||||
private double reservedForArchiveDefault;
|
||||
|
||||
MountVolumeInfo(Configuration conf) {
|
||||
storageTypeVolumeMap = new ConcurrentHashMap<>();
|
||||
storageTypeVolumeMap = new EnumMap<>(StorageType.class);
|
||||
capacityRatioMap = new EnumMap<>(StorageType.class);
|
||||
reservedForArchiveDefault = conf.getDouble(
|
||||
DFSConfigKeys.DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE,
|
||||
DFSConfigKeys
|
||||
@ -71,12 +74,22 @@ FsVolumeReference getVolumeRef(StorageType storageType) {
|
||||
|
||||
/**
|
||||
* Return configured capacity ratio.
|
||||
* If the volume is the only one on the mount,
|
||||
* return 1 to avoid unnecessary allocation.
|
||||
*
|
||||
* TODO: We should support customized capacity ratio for volumes.
|
||||
*/
|
||||
double getCapacityRatio(StorageType storageType) {
|
||||
// If capacity ratio is set, return the val.
|
||||
if (capacityRatioMap.containsKey(storageType)) {
|
||||
return capacityRatioMap.get(storageType);
|
||||
}
|
||||
// If capacity ratio is set for counterpart,
|
||||
// use the rest of capacity of the mount for it.
|
||||
if (!capacityRatioMap.isEmpty()) {
|
||||
double leftOver = 1;
|
||||
for (Map.Entry<StorageType, Double> e : capacityRatioMap.entrySet()) {
|
||||
leftOver -= e.getValue();
|
||||
}
|
||||
return leftOver;
|
||||
}
|
||||
// Use reservedForArchiveDefault by default.
|
||||
if (storageTypeVolumeMap.containsKey(storageType)
|
||||
&& storageTypeVolumeMap.size() > 1) {
|
||||
if (storageType == StorageType.ARCHIVE) {
|
||||
@ -102,9 +115,28 @@ boolean addVolume(FsVolumeImpl volume) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
void removeVolume(FsVolumeImpl target) {
|
||||
storageTypeVolumeMap.remove(target.getStorageType());
|
||||
capacityRatioMap.remove(target.getStorageType());
|
||||
}
|
||||
|
||||
/**
|
||||
* Set customize capacity ratio for a storage type.
|
||||
* Return false if the value is too big.
|
||||
*/
|
||||
boolean setCapacityRatio(StorageType storageType,
|
||||
double capacityRatio) {
|
||||
double leftover = 1;
|
||||
for (Map.Entry<StorageType, Double> e : capacityRatioMap.entrySet()) {
|
||||
if (e.getKey() != storageType) {
|
||||
leftover -= e.getValue();
|
||||
}
|
||||
}
|
||||
if (leftover < capacityRatio) {
|
||||
return false;
|
||||
}
|
||||
capacityRatioMap.put(storageType, capacityRatio);
|
||||
return true;
|
||||
}
|
||||
|
||||
int size() {
|
||||
|
@ -22,6 +22,7 @@
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
@ -34,7 +35,7 @@
|
||||
* we don't configure multiple volumes with same storage type on one mount.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class MountVolumeMap {
|
||||
public class MountVolumeMap {
|
||||
private final ConcurrentMap<String, MountVolumeInfo>
|
||||
mountVolumeMapping;
|
||||
private final Configuration conf;
|
||||
@ -89,4 +90,24 @@ void removeVolume(FsVolumeImpl target) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void setCapacityRatio(FsVolumeImpl target, double capacityRatio)
|
||||
throws IOException {
|
||||
String mount = target.getMount();
|
||||
if (!mount.isEmpty()) {
|
||||
MountVolumeInfo info = mountVolumeMapping.get(mount);
|
||||
if (!info.setCapacityRatio(
|
||||
target.getStorageType(), capacityRatio)) {
|
||||
throw new IOException(
|
||||
"Not enough capacity ratio left on mount: "
|
||||
+ mount + ", for " + target + ": capacity ratio: "
|
||||
+ capacityRatio + ". Sum of the capacity"
|
||||
+ " ratio of on same disk mount should be <= 1");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean hasMount(String mount) {
|
||||
return mountVolumeMapping.containsKey(mount);
|
||||
}
|
||||
}
|
||||
|
@ -6093,6 +6093,21 @@
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.datanode.same-disk-tiering.capacity-ratio.percentage</name>
|
||||
<value></value>
|
||||
<description>
|
||||
Disk capacity ratio of DISK or ARCHIVE volume
|
||||
when dfs.datanode.same-disk-tiering is turned on
|
||||
This will override the value of
|
||||
dfs.datanode.reserve-for-archive.default.percentage .
|
||||
Example value:
|
||||
[0.3]/disk1/archive,[0.7]/disk1/disk,[0.4]/disk2/archive,[0.6]/disk2/disk
|
||||
This is only effective for configured
|
||||
DISK/ARCHIVE volumes in dfs.datanode.data.dir.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.balancer.getBlocks.hot-time-interval</name>
|
||||
<value>0</value>
|
||||
|
@ -40,6 +40,7 @@
|
||||
import javax.management.ObjectName;
|
||||
import javax.management.StandardMBean;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MountVolumeMap;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.math.LongMath;
|
||||
import org.apache.commons.lang3.ArrayUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -1600,5 +1601,10 @@ public Set<? extends Replica> deepCopyReplica(String bpid)
|
||||
}
|
||||
return Collections.unmodifiableSet(replicas);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MountVolumeMap getMountVolumeMap() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -19,10 +19,12 @@
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import java.io.*;
|
||||
import java.net.URI;
|
||||
import java.util.*;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.DF;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.junit.AssumptionViolatedException;
|
||||
@ -128,4 +130,35 @@ public void testDataDirFileSystem() throws Exception {
|
||||
locations = DataNode.getStorageLocations(conf);
|
||||
assertEquals(2, locations.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCapacityRatioForDataDir() {
|
||||
// Good case
|
||||
String config = "[0.9 ]/disk /2, [0.1]/disk2/1";
|
||||
Map<URI, Double> map = StorageLocation.parseCapacityRatio(config);
|
||||
assertEquals(0.9,
|
||||
map.get(new Path("/disk/2").toUri()), 0);
|
||||
assertEquals(0.1,
|
||||
map.get(new Path("/disk2/1").toUri()), 0);
|
||||
|
||||
// config without capacity ratio
|
||||
config = "[0.9 ]/disk /2, /disk2/1";
|
||||
try {
|
||||
StorageLocation.parseCapacityRatio(config);
|
||||
fail("Should fail parsing");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertTrue(e.getMessage().contains(
|
||||
"Capacity ratio config is not with correct form"));
|
||||
}
|
||||
|
||||
// config with bad capacity ratio
|
||||
config = "[11.1]/disk /2";
|
||||
try {
|
||||
StorageLocation.parseCapacityRatio(config);
|
||||
fail("Should fail parsing");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertTrue(e.getMessage().contains("is not between 0 to 1"));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -118,21 +118,7 @@ private void startDFSCluster(int numNameNodes, int numDataNodes)
|
||||
private void startDFSCluster(int numNameNodes, int numDataNodes,
|
||||
int storagePerDataNode) throws IOException {
|
||||
shutdown();
|
||||
conf = new Configuration();
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||
|
||||
/*
|
||||
* Lower the DN heartbeat, DF rate, and recheck interval to one second
|
||||
* so state about failures and datanode death propagates faster.
|
||||
*/
|
||||
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||
conf.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
|
||||
1000);
|
||||
/* Allow 1 volume failure */
|
||||
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
|
||||
conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
|
||||
0, TimeUnit.MILLISECONDS);
|
||||
conf = setConfiguration(new Configuration());
|
||||
|
||||
MiniDFSNNTopology nnTopology =
|
||||
MiniDFSNNTopology.simpleFederatedTopology(numNameNodes);
|
||||
@ -145,6 +131,28 @@ private void startDFSCluster(int numNameNodes, int numDataNodes,
|
||||
cluster.waitActive();
|
||||
}
|
||||
|
||||
private Configuration setConfiguration(Configuration config) {
|
||||
config.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||
|
||||
config.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||
config.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 1);
|
||||
|
||||
/*
|
||||
* Lower the DN heartbeat, DF rate, and recheck interval to one second
|
||||
* so state about failures and datanode death propagates faster.
|
||||
*/
|
||||
config.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||
config.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000);
|
||||
config.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
|
||||
1000);
|
||||
/* Allow 1 volume failure */
|
||||
config.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
|
||||
config.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
|
||||
0, TimeUnit.MILLISECONDS);
|
||||
|
||||
return config;
|
||||
}
|
||||
|
||||
private void shutdown() {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
@ -1119,4 +1127,34 @@ public void testFullBlockReportAfterRemovingVolumes()
|
||||
any(StorageBlockReport[].class),
|
||||
any(BlockReportContext.class));
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testAddVolumeWithVolumeOnSameMount()
|
||||
throws IOException {
|
||||
shutdown();
|
||||
conf = setConfiguration(new Configuration());
|
||||
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING, true);
|
||||
conf.setDouble(DFSConfigKeys
|
||||
.DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE, 0.4);
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(1)
|
||||
.storagesPerDatanode(2)
|
||||
.storageTypes(new StorageType[]{StorageType.DISK, StorageType.ARCHIVE})
|
||||
.build();
|
||||
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
List<String> dirs = getDataDirs(dn);
|
||||
dirs.add(dirs.get(1) + "_2");
|
||||
|
||||
// Replace should be successful.
|
||||
try {
|
||||
String[] newVal = dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY,
|
||||
String.join(",", dirs)).split(",");
|
||||
fail("Adding mount should fail.");
|
||||
} catch (Exception e) {
|
||||
assertTrue(e.getCause()
|
||||
.getLocalizedMessage().contains("already has volume"));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -23,6 +23,7 @@
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MountVolumeMap;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
@ -465,4 +466,9 @@ public Set<? extends Replica> deepCopyReplica(String bpid)
|
||||
throws IOException {
|
||||
return Collections.EMPTY_SET;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MountVolumeMap getMountVolumeMap() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -22,6 +22,7 @@
|
||||
import java.util.Collections;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.apache.hadoop.fs.DF;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
|
||||
|
||||
@ -397,7 +398,7 @@ public void testAddVolumeWithSameDiskTiering() throws IOException {
|
||||
true);
|
||||
conf.setDouble(DFSConfigKeys
|
||||
.DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE,
|
||||
0.5);
|
||||
0.4);
|
||||
|
||||
when(datanode.getConf()).thenReturn(conf);
|
||||
final DNConf dnConf = new DNConf(datanode);
|
||||
@ -415,11 +416,19 @@ public void testAddVolumeWithSameDiskTiering() throws IOException {
|
||||
for (String bpid : BLOCK_POOL_IDS) {
|
||||
nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1));
|
||||
}
|
||||
dataset.addVolume(
|
||||
createStorageWithStorageType("archive1",
|
||||
StorageType.ARCHIVE, conf, storage, datanode), nsInfos);
|
||||
StorageLocation archive = createStorageWithStorageType("archive1",
|
||||
StorageType.ARCHIVE, conf, storage, datanode);
|
||||
dataset.addVolume(archive, nsInfos);
|
||||
assertEquals(2, dataset.getVolumeCount());
|
||||
|
||||
String mount = new DF(new File(archive.getUri()), conf).getMount();
|
||||
double archiveRatio = dataset.getMountVolumeMap()
|
||||
.getCapacityRatioByMountAndStorageType(mount, StorageType.ARCHIVE);
|
||||
double diskRatio = dataset.getMountVolumeMap()
|
||||
.getCapacityRatioByMountAndStorageType(mount, StorageType.DISK);
|
||||
assertEquals(0.4, archiveRatio, 0);
|
||||
assertEquals(0.6, diskRatio, 0);
|
||||
|
||||
// Add second ARCHIVAL volume should fail fsDataSetImpl.
|
||||
try {
|
||||
dataset.addVolume(
|
||||
@ -433,6 +442,106 @@ public void testAddVolumeWithSameDiskTiering() throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddVolumeWithCustomizedCapacityRatio()
|
||||
throws IOException {
|
||||
datanode = mock(DataNode.class);
|
||||
storage = mock(DataStorage.class);
|
||||
this.conf = new Configuration();
|
||||
this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0);
|
||||
this.conf.set(DFSConfigKeys.DFS_DATANODE_REPLICA_CACHE_ROOT_DIR_KEY,
|
||||
replicaCacheRootDir);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING,
|
||||
true);
|
||||
conf.setDouble(DFSConfigKeys
|
||||
.DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE,
|
||||
0.5);
|
||||
|
||||
// 1) Normal case, get capacity should return correct value.
|
||||
String archivedir = "/archive1";
|
||||
String diskdir = "/disk1";
|
||||
String configStr = "[0.3]file:" + BASE_DIR + archivedir
|
||||
+ ", " + "[0.6]file:" + BASE_DIR + diskdir;
|
||||
|
||||
conf.set(DFSConfigKeys
|
||||
.DFS_DATANODE_SAME_DISK_TIERING_CAPACITY_RATIO_PERCENTAGE,
|
||||
configStr);
|
||||
|
||||
when(datanode.getConf()).thenReturn(conf);
|
||||
final DNConf dnConf = new DNConf(datanode);
|
||||
when(datanode.getDnConf()).thenReturn(dnConf);
|
||||
final BlockScanner disabledBlockScanner = new BlockScanner(datanode);
|
||||
when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner);
|
||||
final ShortCircuitRegistry shortCircuitRegistry =
|
||||
new ShortCircuitRegistry(conf);
|
||||
when(datanode.getShortCircuitRegistry()).thenReturn(shortCircuitRegistry);
|
||||
|
||||
createStorageDirs(storage, conf, 0);
|
||||
|
||||
dataset = createStorageWithCapacityRatioConfig(
|
||||
configStr, archivedir, diskdir);
|
||||
|
||||
Path p = new Path("file:" + BASE_DIR);
|
||||
String mount = new DF(new File(p.toUri()), conf).getMount();
|
||||
double archiveRatio = dataset.getMountVolumeMap()
|
||||
.getCapacityRatioByMountAndStorageType(mount, StorageType.ARCHIVE);
|
||||
double diskRatio = dataset.getMountVolumeMap()
|
||||
.getCapacityRatioByMountAndStorageType(mount, StorageType.DISK);
|
||||
assertEquals(0.3, archiveRatio, 0);
|
||||
assertEquals(0.6, diskRatio, 0);
|
||||
|
||||
// 2) Counter part volume should get rest of the capacity
|
||||
// wihtout explicit config
|
||||
configStr = "[0.3]file:" + BASE_DIR + archivedir;
|
||||
dataset = createStorageWithCapacityRatioConfig(
|
||||
configStr, archivedir, diskdir);
|
||||
mount = new DF(new File(p.toUri()), conf).getMount();
|
||||
archiveRatio = dataset.getMountVolumeMap()
|
||||
.getCapacityRatioByMountAndStorageType(mount, StorageType.ARCHIVE);
|
||||
diskRatio = dataset.getMountVolumeMap()
|
||||
.getCapacityRatioByMountAndStorageType(mount, StorageType.DISK);
|
||||
assertEquals(0.3, archiveRatio, 0);
|
||||
assertEquals(0.7, diskRatio, 0);
|
||||
|
||||
// 3) Add volume will fail if capacity ratio is > 1
|
||||
dataset = new FsDatasetImpl(datanode, storage, conf);
|
||||
configStr = "[0.3]file:" + BASE_DIR + archivedir
|
||||
+ ", " + "[0.8]file:" + BASE_DIR + diskdir;
|
||||
|
||||
try {
|
||||
createStorageWithCapacityRatioConfig(
|
||||
configStr, archivedir, diskdir);
|
||||
fail("Should fail add volume as capacity ratio sum is > 1");
|
||||
} catch (IOException e) {
|
||||
assertTrue(e.getMessage()
|
||||
.contains("Not enough capacity ratio left on mount"));
|
||||
}
|
||||
}
|
||||
|
||||
private FsDatasetImpl createStorageWithCapacityRatioConfig(
|
||||
String configStr, String archivedir, String diskdir)
|
||||
throws IOException {
|
||||
conf.set(DFSConfigKeys
|
||||
.DFS_DATANODE_SAME_DISK_TIERING_CAPACITY_RATIO_PERCENTAGE, configStr
|
||||
);
|
||||
dataset = new FsDatasetImpl(datanode, storage, conf);
|
||||
List<NamespaceInfo> nsInfos = Lists.newArrayList();
|
||||
for (String bpid : BLOCK_POOL_IDS) {
|
||||
nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1));
|
||||
}
|
||||
|
||||
StorageLocation archive = createStorageWithStorageType(
|
||||
archivedir, StorageType.ARCHIVE, conf, storage, datanode);
|
||||
|
||||
StorageLocation disk = createStorageWithStorageType(
|
||||
diskdir, StorageType.DISK, conf, storage, datanode);
|
||||
|
||||
dataset.addVolume(archive, nsInfos);
|
||||
dataset.addVolume(disk, nsInfos);
|
||||
assertEquals(2, dataset.getVolumeCount());
|
||||
return dataset;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddVolumeWithSameStorageUuid() throws IOException {
|
||||
HdfsConfiguration config = new HdfsConfiguration();
|
||||
|
Loading…
Reference in New Issue
Block a user