HDFS-14512. ONE_SSD policy will be violated while write data with DistributedFileSystem.create(....favoredNodes). Contributed by Ayush Saxena.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
Ayush Saxena 2019-05-29 20:52:58 -07:00 committed by Wei-Chiu Chuang
parent 9ad7cad205
commit c1caab40f2
3 changed files with 66 additions and 10 deletions

View File

@ -19,6 +19,7 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.EnumMap;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -101,6 +102,17 @@ DatanodeStorageInfo[] chooseTarget(String src,
excludedNodes, blocksize, storagePolicy, flags); excludedNodes, blocksize, storagePolicy, flags);
} }
/**
* @param storageTypes storage types that should be used as targets.
*/
public DatanodeStorageInfo[] chooseTarget(String srcPath, int numOfReplicas,
Node writer, List<DatanodeStorageInfo> chosen, boolean returnChosenNodes,
Set<Node> excludedNodes, long blocksize, BlockStoragePolicy storagePolicy,
EnumSet<AddBlockFlag> flags, EnumMap<StorageType, Integer> storageTypes) {
return chooseTarget(srcPath, numOfReplicas, writer, chosen,
returnChosenNodes, excludedNodes, blocksize, storagePolicy, flags);
}
/** /**
* Verify if the block's placement meets requirement of placement policy, * Verify if the block's placement meets requirement of placement policy,
* i.e. replicas are placed on no less than minRacks racks in the system. * i.e. replicas are placed on no less than minRacks racks in the system.

View File

@ -150,7 +150,16 @@ public DatanodeStorageInfo[] chooseTarget(String srcPath,
final BlockStoragePolicy storagePolicy, final BlockStoragePolicy storagePolicy,
EnumSet<AddBlockFlag> flags) { EnumSet<AddBlockFlag> flags) {
return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes, return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes,
excludedNodes, blocksize, storagePolicy, flags); excludedNodes, blocksize, storagePolicy, flags, null);
}
@Override
public DatanodeStorageInfo[] chooseTarget(String srcPath, int numOfReplicas,
Node writer, List<DatanodeStorageInfo> chosen, boolean returnChosenNodes,
Set<Node> excludedNodes, long blocksize, BlockStoragePolicy storagePolicy,
EnumSet<AddBlockFlag> flags, EnumMap<StorageType, Integer> storageTypes) {
return chooseTarget(numOfReplicas, writer, chosen, returnChosenNodes,
excludedNodes, blocksize, storagePolicy, flags, storageTypes);
} }
@Override @Override
@ -202,7 +211,8 @@ DatanodeStorageInfo[] chooseTarget(String src,
DatanodeStorageInfo[] remainingTargets = DatanodeStorageInfo[] remainingTargets =
chooseTarget(src, numOfReplicas, writer, chooseTarget(src, numOfReplicas, writer,
new ArrayList<DatanodeStorageInfo>(numOfReplicas), false, new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
favoriteAndExcludedNodes, blocksize, storagePolicy, flags); favoriteAndExcludedNodes, blocksize, storagePolicy, flags,
storageTypes);
for (int i = 0; i < remainingTargets.length; i++) { for (int i = 0; i < remainingTargets.length; i++) {
results.add(remainingTargets[i]); results.add(remainingTargets[i]);
} }
@ -252,7 +262,8 @@ private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
Set<Node> excludedNodes, Set<Node> excludedNodes,
long blocksize, long blocksize,
final BlockStoragePolicy storagePolicy, final BlockStoragePolicy storagePolicy,
EnumSet<AddBlockFlag> addBlockFlags) { EnumSet<AddBlockFlag> addBlockFlags,
EnumMap<StorageType, Integer> sTypes) {
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) { if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return DatanodeStorageInfo.EMPTY_ARRAY; return DatanodeStorageInfo.EMPTY_ARRAY;
} }
@ -290,7 +301,7 @@ private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
localNode = chooseTarget(numOfReplicas, writer, localNode = chooseTarget(numOfReplicas, writer,
excludedNodeCopy, blocksize, maxNodesPerRack, results, excludedNodeCopy, blocksize, maxNodesPerRack, results,
avoidStaleNodes, storagePolicy, avoidStaleNodes, storagePolicy,
EnumSet.noneOf(StorageType.class), results.isEmpty()); EnumSet.noneOf(StorageType.class), results.isEmpty(), sTypes);
if (results.size() < numOfReplicas) { if (results.size() < numOfReplicas) {
// not enough nodes; discard results and fall back // not enough nodes; discard results and fall back
results = null; results = null;
@ -300,7 +311,8 @@ private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
results = new ArrayList<>(chosenStorage); results = new ArrayList<>(chosenStorage);
localNode = chooseTarget(numOfReplicas, writer, excludedNodes, localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes,
storagePolicy, EnumSet.noneOf(StorageType.class), results.isEmpty()); storagePolicy, EnumSet.noneOf(StorageType.class), results.isEmpty(),
sTypes);
} }
if (!returnChosenNodes) { if (!returnChosenNodes) {
@ -380,6 +392,7 @@ private EnumMap<StorageType, Integer> getRequiredStorageTypes(
* @param maxNodesPerRack max nodes allowed per rack * @param maxNodesPerRack max nodes allowed per rack
* @param results the target nodes already chosen * @param results the target nodes already chosen
* @param avoidStaleNodes avoid stale nodes in replica choosing * @param avoidStaleNodes avoid stale nodes in replica choosing
* @param storageTypes storage type to be considered for target
* @return local node of writer (not chosen node) * @return local node of writer (not chosen node)
*/ */
private Node chooseTarget(int numOfReplicas, private Node chooseTarget(int numOfReplicas,
@ -391,7 +404,8 @@ private Node chooseTarget(int numOfReplicas,
final boolean avoidStaleNodes, final boolean avoidStaleNodes,
final BlockStoragePolicy storagePolicy, final BlockStoragePolicy storagePolicy,
final EnumSet<StorageType> unavailableStorages, final EnumSet<StorageType> unavailableStorages,
final boolean newBlock) { final boolean newBlock,
EnumMap<StorageType, Integer> storageTypes) {
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) { if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return (writer instanceof DatanodeDescriptor) ? writer : null; return (writer instanceof DatanodeDescriptor) ? writer : null;
} }
@ -409,8 +423,9 @@ private Node chooseTarget(int numOfReplicas,
.chooseStorageTypes((short) totalReplicasExpected, .chooseStorageTypes((short) totalReplicasExpected,
DatanodeStorageInfo.toStorageTypes(results), DatanodeStorageInfo.toStorageTypes(results),
unavailableStorages, newBlock); unavailableStorages, newBlock);
final EnumMap<StorageType, Integer> storageTypes = if (storageTypes == null) {
getRequiredStorageTypes(requiredStorageTypes); storageTypes = getRequiredStorageTypes(requiredStorageTypes);
}
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("storageTypes=" + storageTypes); LOG.trace("storageTypes=" + storageTypes);
} }
@ -453,7 +468,7 @@ private Node chooseTarget(int numOfReplicas,
numOfReplicas = totalReplicasExpected - results.size(); numOfReplicas = totalReplicasExpected - results.size();
return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize, return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
maxNodesPerRack, results, false, storagePolicy, unavailableStorages, maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
newBlock); newBlock, null);
} }
boolean retry = false; boolean retry = false;
@ -473,7 +488,7 @@ private Node chooseTarget(int numOfReplicas,
numOfReplicas = totalReplicasExpected - results.size(); numOfReplicas = totalReplicasExpected - results.size();
return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize, return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
maxNodesPerRack, results, false, storagePolicy, unavailableStorages, maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
newBlock); newBlock, null);
} }
} }
return writer; return writer;

View File

@ -40,6 +40,7 @@
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashSet; import java.util.HashSet;
@ -50,6 +51,7 @@
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -80,6 +82,7 @@
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem.HdfsDataOutputStreamBuilder; import org.apache.hadoop.hdfs.DistributedFileSystem.HdfsDataOutputStreamBuilder;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer; import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType; import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
@ -1979,4 +1982,30 @@ public Object run() throws Exception {
} }
} }
} }
@Test
public void testStorageFavouredNodes()
throws IOException, InterruptedException, TimeoutException {
Configuration conf = new HdfsConfiguration();
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.storageTypes(new StorageType[] {StorageType.SSD, StorageType.DISK})
.numDataNodes(3).storagesPerDatanode(2).build()) {
DistributedFileSystem fs = cluster.getFileSystem();
Path file1 = new Path("/tmp/file1");
fs.mkdirs(new Path("/tmp"));
fs.setStoragePolicy(new Path("/tmp"), "ONE_SSD");
InetSocketAddress[] addrs =
{cluster.getDataNodes().get(0).getXferAddress()};
HdfsDataOutputStream stream = fs.create(file1, FsPermission.getDefault(),
false, 1024, (short) 3, 1024, null, addrs);
stream.write("Some Bytes".getBytes());
stream.close();
DFSTestUtil.waitReplication(fs, file1, (short) 3);
BlockLocation[] locations = fs.getClient()
.getBlockLocations(file1.toUri().getPath(), 0, Long.MAX_VALUE);
int numSSD = Collections.frequency(
Arrays.asList(locations[0].getStorageTypes()), StorageType.SSD);
assertEquals("Number of SSD should be 1 but was : " + numSSD, 1, numSSD);
}
}
} }