HDFS-10473: Allow only suitable storage policies to be set on striped files. Contributed by Uma Maheswara Rao G
This commit is contained in:
parent
4ee3543625
commit
17eae9ebb3
@ -41,6 +41,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.balancer.Matcher;
|
import org.apache.hadoop.hdfs.server.balancer.Matcher;
|
||||||
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
|
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INode;
|
import org.apache.hadoop.hdfs.server.namenode.INode;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||||
@ -386,7 +387,19 @@ private void processFile(String fullPath, HdfsLocatedFileStatus status,
|
|||||||
}
|
}
|
||||||
LocatedBlock lb = lbs.get(i);
|
LocatedBlock lb = lbs.get(i);
|
||||||
if (lb.isStriped()) {
|
if (lb.isStriped()) {
|
||||||
|
if (ErasureCodingPolicyManager
|
||||||
|
.checkStoragePolicySuitableForECStripedMode(policyId)) {
|
||||||
types = policy.chooseStorageTypes((short) lb.getLocations().length);
|
types = policy.chooseStorageTypes((short) lb.getLocations().length);
|
||||||
|
} else {
|
||||||
|
// Currently we support only limited policies (HOT, COLD, ALLSSD)
|
||||||
|
// for EC striped mode files.
|
||||||
|
// Mover tool will ignore to move the blocks if the storage policy
|
||||||
|
// is not in EC Striped mode supported policies
|
||||||
|
LOG.warn("The storage policy " + policy.getName()
|
||||||
|
+ " is not suitable for Striped EC files. "
|
||||||
|
+ "So, Ignoring to move the blocks");
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
final StorageTypeDiff diff = new StorageTypeDiff(types,
|
final StorageTypeDiff diff = new StorageTypeDiff(types,
|
||||||
lb.getStorageTypes());
|
lb.getStorageTypes());
|
||||||
|
@ -53,6 +53,11 @@ public final class ErasureCodingPolicyManager {
|
|||||||
private static final ErasureCodingPolicy[] SYS_POLICIES =
|
private static final ErasureCodingPolicy[] SYS_POLICIES =
|
||||||
new ErasureCodingPolicy[]{SYS_POLICY1, SYS_POLICY2, SYS_POLICY3};
|
new ErasureCodingPolicy[]{SYS_POLICY1, SYS_POLICY2, SYS_POLICY3};
|
||||||
|
|
||||||
|
// Supported storage policies for striped EC files
|
||||||
|
private static final byte[] SUITABLE_STORAGE_POLICIES_FOR_EC_STRIPED_MODE = new byte[] {
|
||||||
|
HdfsConstants.HOT_STORAGE_POLICY_ID, HdfsConstants.COLD_STORAGE_POLICY_ID,
|
||||||
|
HdfsConstants.ALLSSD_STORAGE_POLICY_ID };
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* All active policies maintained in NN memory for fast querying,
|
* All active policies maintained in NN memory for fast querying,
|
||||||
* identified and sorted by its name.
|
* identified and sorted by its name.
|
||||||
@ -120,6 +125,21 @@ public ErasureCodingPolicy getPolicyByID(byte id) {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return True if given policy is be suitable for striped EC Files.
|
||||||
|
*/
|
||||||
|
public static boolean checkStoragePolicySuitableForECStripedMode(
|
||||||
|
byte storagePolicyID) {
|
||||||
|
boolean isPolicySuitable = false;
|
||||||
|
for (byte suitablePolicy : SUITABLE_STORAGE_POLICIES_FOR_EC_STRIPED_MODE) {
|
||||||
|
if (storagePolicyID == suitablePolicy) {
|
||||||
|
isPolicySuitable = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return isPolicySuitable;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clear and clean up
|
* Clear and clean up
|
||||||
*/
|
*/
|
||||||
|
@ -36,6 +36,7 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
@ -496,9 +497,25 @@ public byte getLocalStoragePolicyID() {
|
|||||||
public byte getStoragePolicyID() {
|
public byte getStoragePolicyID() {
|
||||||
byte id = getLocalStoragePolicyID();
|
byte id = getLocalStoragePolicyID();
|
||||||
if (id == BLOCK_STORAGE_POLICY_ID_UNSPECIFIED) {
|
if (id == BLOCK_STORAGE_POLICY_ID_UNSPECIFIED) {
|
||||||
return this.getParent() != null ?
|
id = this.getParent() != null ?
|
||||||
this.getParent().getStoragePolicyID() : id;
|
this.getParent().getStoragePolicyID() : id;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// For Striped EC files, we support only suitable policies. Current
|
||||||
|
// supported policies are HOT, COLD, ALL_SSD.
|
||||||
|
// If the file was set with any other policies, then we just treat policy as
|
||||||
|
// BLOCK_STORAGE_POLICY_ID_UNSPECIFIED.
|
||||||
|
if (isStriped() && id != BLOCK_STORAGE_POLICY_ID_UNSPECIFIED
|
||||||
|
&& !ErasureCodingPolicyManager
|
||||||
|
.checkStoragePolicySuitableForECStripedMode(id)) {
|
||||||
|
id = HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("The current effective storage policy id : " + id
|
||||||
|
+ " is not suitable for striped mode EC file : " + getName()
|
||||||
|
+ ". So, just returning unspecified storage policy id");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return id;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -526,6 +526,38 @@ public void testMoverWithStripedFile() throws Exception {
|
|||||||
StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
|
StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
|
||||||
dataBlocks + parityBlocks);
|
dataBlocks + parityBlocks);
|
||||||
|
|
||||||
|
// start 5 more datanodes
|
||||||
|
numOfDatanodes += 5;
|
||||||
|
capacities = new long[5][storagesPerDatanode];
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
for (int j = 0; j < storagesPerDatanode; j++) {
|
||||||
|
capacities[i][j] = capacity;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cluster.startDataNodes(conf, 5,
|
||||||
|
new StorageType[][] { { StorageType.SSD, StorageType.DISK },
|
||||||
|
{ StorageType.SSD, StorageType.DISK },
|
||||||
|
{ StorageType.SSD, StorageType.DISK },
|
||||||
|
{ StorageType.SSD, StorageType.DISK },
|
||||||
|
{ StorageType.SSD, StorageType.DISK } },
|
||||||
|
true, null, null, null, capacities, null, false, false, false, null);
|
||||||
|
cluster.triggerHeartbeats();
|
||||||
|
|
||||||
|
// move file blocks to ONE_SSD policy
|
||||||
|
client.setStoragePolicy(barDir, "ONE_SSD");
|
||||||
|
|
||||||
|
// run Mover
|
||||||
|
rc = ToolRunner.run(conf, new Mover.Cli(), new String[] { "-p", barDir });
|
||||||
|
|
||||||
|
// verify storage types and locations
|
||||||
|
// Movements should have been ignored for the unsupported policy on
|
||||||
|
// striped file
|
||||||
|
locatedBlocks = client.getBlockLocations(fooFile, 0, fileLen);
|
||||||
|
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
|
||||||
|
for (StorageType type : lb.getStorageTypes()) {
|
||||||
|
Assert.assertEquals(StorageType.ARCHIVE, type);
|
||||||
|
}
|
||||||
|
}
|
||||||
}finally{
|
}finally{
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
@ -27,21 +27,28 @@
|
|||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.NameNodeProxies;
|
||||||
|
import org.apache.hadoop.hdfs.StripedFileTestUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.Timeout;
|
import org.junit.rules.Timeout;
|
||||||
@ -287,4 +294,82 @@ public void testDeleteOp() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests when choosing blocks on file creation of EC striped mode should
|
||||||
|
* ignore storage policy if that is not suitable. Supported storage policies
|
||||||
|
* for EC Striped mode are HOT, COLD and ALL_SSD. For all other policies set
|
||||||
|
* will be ignored and considered default policy.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testUnsuitableStoragePoliciesWithECStripedMode()
|
||||||
|
throws Exception {
|
||||||
|
final Configuration conf = new HdfsConfiguration();
|
||||||
|
int defaultStripedBlockSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE
|
||||||
|
* 4;
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, defaultStripedBlockSize);
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L);
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
|
||||||
|
false);
|
||||||
|
|
||||||
|
// start 10 datanodes
|
||||||
|
int numOfDatanodes = 10;
|
||||||
|
int storagesPerDatanode = 2;
|
||||||
|
long capacity = 10 * defaultStripedBlockSize;
|
||||||
|
long[][] capacities = new long[numOfDatanodes][storagesPerDatanode];
|
||||||
|
for (int i = 0; i < numOfDatanodes; i++) {
|
||||||
|
for (int j = 0; j < storagesPerDatanode; j++) {
|
||||||
|
capacities[i][j] = capacity;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(numOfDatanodes).storagesPerDatanode(storagesPerDatanode)
|
||||||
|
.storageTypes(
|
||||||
|
new StorageType[][] { { StorageType.SSD, StorageType.DISK },
|
||||||
|
{ StorageType.SSD, StorageType.DISK },
|
||||||
|
{ StorageType.SSD, StorageType.DISK },
|
||||||
|
{ StorageType.SSD, StorageType.DISK },
|
||||||
|
{ StorageType.SSD, StorageType.DISK },
|
||||||
|
{ StorageType.DISK, StorageType.SSD },
|
||||||
|
{ StorageType.DISK, StorageType.SSD },
|
||||||
|
{ StorageType.DISK, StorageType.SSD },
|
||||||
|
{ StorageType.DISK, StorageType.SSD },
|
||||||
|
{ StorageType.DISK, StorageType.SSD } })
|
||||||
|
.storageCapacities(capacities).build();
|
||||||
|
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
// set "/foo" directory with ONE_SSD storage policy.
|
||||||
|
ClientProtocol client = NameNodeProxies.createProxy(conf,
|
||||||
|
cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
|
||||||
|
String fooDir = "/foo";
|
||||||
|
client.mkdirs(fooDir, new FsPermission((short) 777), true);
|
||||||
|
client.setStoragePolicy(fooDir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
|
||||||
|
// set an EC policy on "/foo" directory
|
||||||
|
client.setErasureCodingPolicy(fooDir, null);
|
||||||
|
|
||||||
|
// write file to fooDir
|
||||||
|
final String barFile = "/foo/bar";
|
||||||
|
long fileLen = 20 * defaultStripedBlockSize;
|
||||||
|
DFSTestUtil.createFile(cluster.getFileSystem(), new Path(barFile),
|
||||||
|
fileLen, (short) 3, 0);
|
||||||
|
|
||||||
|
// verify storage types and locations
|
||||||
|
LocatedBlocks locatedBlocks = client.getBlockLocations(barFile, 0,
|
||||||
|
fileLen);
|
||||||
|
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
|
||||||
|
for (StorageType type : lb.getStorageTypes()) {
|
||||||
|
Assert.assertEquals(StorageType.DISK, type);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user