HDFS-12893. [READ] Support replication of Provided blocks with non-default topologies.

This commit is contained in:
Virajith Jalaparti 2017-12-08 14:52:48 -08:00 committed by Chris Douglas
parent 0f6aa9564c
commit c89b29bd42
4 changed files with 97 additions and 11 deletions

View File

@ -2150,6 +2150,22 @@ List<DatanodeDescriptor> getDatanodeDescriptors(List<String> nodes) {
return datanodeDescriptors; return datanodeDescriptors;
} }
/**
* Get the associated {@link DatanodeDescriptor} for the storage.
* If the storage is of type PROVIDED, one of the nodes that reported
* PROVIDED storage are returned. If not, this is equivalent to
* {@code storage.getDatanodeDescriptor()}.
* @param storage
* @return the associated {@link DatanodeDescriptor}.
*/
private DatanodeDescriptor getDatanodeDescriptorFromStorage(
DatanodeStorageInfo storage) {
if (storage.getStorageType() == StorageType.PROVIDED) {
return providedStorageMap.chooseProvidedDatanode();
}
return storage.getDatanodeDescriptor();
}
/** /**
* Parse the data-nodes the block belongs to and choose a certain number * Parse the data-nodes the block belongs to and choose a certain number
* from them to be the recovery sources. * from them to be the recovery sources.
@ -2198,10 +2214,14 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
BitSet bitSet = isStriped ? BitSet bitSet = isStriped ?
new BitSet(((BlockInfoStriped) block).getTotalBlockNum()) : null; new BitSet(((BlockInfoStriped) block).getTotalBlockNum()) : null;
for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) { for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
final DatanodeDescriptor node = storage.getDatanodeDescriptor(); final DatanodeDescriptor node = getDatanodeDescriptorFromStorage(storage);
final StoredReplicaState state = checkReplicaOnStorage(numReplicas, block, final StoredReplicaState state = checkReplicaOnStorage(numReplicas, block,
storage, corruptReplicas.getNodes(block), false); storage, corruptReplicas.getNodes(block), false);
if (state == StoredReplicaState.LIVE) { if (state == StoredReplicaState.LIVE) {
if (storage.getStorageType() == StorageType.PROVIDED) {
storage = new DatanodeStorageInfo(node, storage.getStorageID(),
storage.getStorageType(), storage.getState());
}
nodesContainingLiveReplicas.add(storage); nodesContainingLiveReplicas.add(storage);
} }
containingNodes.add(node); containingNodes.add(node);
@ -4338,7 +4358,13 @@ boolean isPlacementPolicySatisfied(BlockInfo storedBlock) {
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
.getNodes(storedBlock); .getNodes(storedBlock);
for (DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) { for (DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) {
final DatanodeDescriptor cur = storage.getDatanodeDescriptor(); if (storage.getStorageType() == StorageType.PROVIDED
&& storage.getState() == State.NORMAL) {
// assume the policy is satisfied for blocks on PROVIDED storage
// as long as the storage is in normal state.
return true;
}
final DatanodeDescriptor cur = getDatanodeDescriptorFromStorage(storage);
// Nodes under maintenance should be counted as valid replicas from // Nodes under maintenance should be counted as valid replicas from
// rack policy point of view. // rack policy point of view.
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned() if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()

View File

@ -120,10 +120,15 @@ public void updateFromStorage(DatanodeStorage storage) {
private boolean blockContentsStale = true; private boolean blockContentsStale = true;
DatanodeStorageInfo(DatanodeDescriptor dn, DatanodeStorage s) { DatanodeStorageInfo(DatanodeDescriptor dn, DatanodeStorage s) {
this(dn, s.getStorageID(), s.getStorageType(), s.getState());
}
DatanodeStorageInfo(DatanodeDescriptor dn, String storageID,
StorageType storageType, State state) {
this.dn = dn; this.dn = dn;
this.storageID = s.getStorageID(); this.storageID = storageID;
this.storageType = s.getStorageType(); this.storageType = storageType;
this.state = s.getState(); this.state = state;
} }
public int getBlockReportCount() { public int getBlockReportCount() {

View File

@ -192,7 +192,7 @@ public long getCapacity() {
} }
public void updateStorage(DatanodeDescriptor node, DatanodeStorage storage) { public void updateStorage(DatanodeDescriptor node, DatanodeStorage storage) {
if (providedEnabled && storageId.equals(storage.getStorageID())) { if (isProvidedStorage(storage.getStorageID())) {
if (StorageType.PROVIDED.equals(storage.getStorageType())) { if (StorageType.PROVIDED.equals(storage.getStorageType())) {
node.injectStorage(providedStorageInfo); node.injectStorage(providedStorageInfo);
return; return;
@ -204,6 +204,22 @@ public void updateStorage(DatanodeDescriptor node, DatanodeStorage storage) {
node.updateStorage(storage); node.updateStorage(storage);
} }
private boolean isProvidedStorage(String dnStorageId) {
return providedEnabled && storageId.equals(dnStorageId);
}
/**
* Choose a datanode that reported a volume of {@link StorageType} PROVIDED.
*
* @return the {@link DatanodeDescriptor} corresponding to a datanode that
* reported a volume with {@link StorageType} PROVIDED. If multiple
* datanodes report a PROVIDED volume, one is chosen uniformly at
* random.
*/
public DatanodeDescriptor chooseProvidedDatanode() {
return providedDescriptor.chooseRandom();
}
/** /**
* Builder used for creating {@link LocatedBlocks} when a block is provided. * Builder used for creating {@link LocatedBlocks} when a block is provided.
*/ */

View File

@ -65,6 +65,7 @@
import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap; import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@ -201,8 +202,15 @@ void createImage(TreeWalk t, Path out,
void startCluster(Path nspath, int numDatanodes, void startCluster(Path nspath, int numDatanodes,
StorageType[] storageTypes, StorageType[] storageTypes,
StorageType[][] storageTypesPerDatanode, StorageType[][] storageTypesPerDatanode,
boolean doFormat) boolean doFormat) throws IOException {
throws IOException { startCluster(nspath, numDatanodes, storageTypes, storageTypesPerDatanode,
doFormat, null);
}
void startCluster(Path nspath, int numDatanodes,
StorageType[] storageTypes,
StorageType[][] storageTypesPerDatanode,
boolean doFormat, String[] racks) throws IOException {
conf.set(DFS_NAMENODE_NAME_DIR_KEY, nspath.toString()); conf.set(DFS_NAMENODE_NAME_DIR_KEY, nspath.toString());
if (storageTypesPerDatanode != null) { if (storageTypesPerDatanode != null) {
@ -211,6 +219,7 @@ void startCluster(Path nspath, int numDatanodes,
.manageNameDfsDirs(doFormat) .manageNameDfsDirs(doFormat)
.numDataNodes(numDatanodes) .numDataNodes(numDatanodes)
.storageTypes(storageTypesPerDatanode) .storageTypes(storageTypesPerDatanode)
.racks(racks)
.build(); .build();
} else if (storageTypes != null) { } else if (storageTypes != null) {
cluster = new MiniDFSCluster.Builder(conf) cluster = new MiniDFSCluster.Builder(conf)
@ -219,12 +228,14 @@ void startCluster(Path nspath, int numDatanodes,
.numDataNodes(numDatanodes) .numDataNodes(numDatanodes)
.storagesPerDatanode(storageTypes.length) .storagesPerDatanode(storageTypes.length)
.storageTypes(storageTypes) .storageTypes(storageTypes)
.racks(racks)
.build(); .build();
} else { } else {
cluster = new MiniDFSCluster.Builder(conf) cluster = new MiniDFSCluster.Builder(conf)
.format(doFormat) .format(doFormat)
.manageNameDfsDirs(doFormat) .manageNameDfsDirs(doFormat)
.numDataNodes(numDatanodes) .numDataNodes(numDatanodes)
.racks(racks)
.build(); .build();
} }
cluster.waitActive(); cluster.waitActive();
@ -515,11 +526,12 @@ public void testSetReplicationForProvidedFiles() throws Exception {
StorageType.PROVIDED, StorageType.DISK}, StorageType.PROVIDED, StorageType.DISK},
null, null,
false); false);
setAndUnsetReplication("/" + filePrefix + (numFiles - 1) + fileSuffix);
}
String filename = "/" + filePrefix + (numFiles - 1) + fileSuffix; private void setAndUnsetReplication(String filename) throws Exception {
Path file = new Path(filename); Path file = new Path(filename);
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();
// set the replication to 4, and test that the file has // set the replication to 4, and test that the file has
// the required replication. // the required replication.
short newReplication = 4; short newReplication = 4;
@ -833,7 +845,7 @@ public void testDatanodeLifeCycle() throws Exception {
new StorageType[] {StorageType.PROVIDED, StorageType.DISK}, new StorageType[] {StorageType.PROVIDED, StorageType.DISK},
null, false); null, false);
int fileIndex = numFiles -1; int fileIndex = numFiles - 1;
final BlockManager blockManager = cluster.getNamesystem().getBlockManager(); final BlockManager blockManager = cluster.getNamesystem().getBlockManager();
final DatanodeManager dnm = blockManager.getDatanodeManager(); final DatanodeManager dnm = blockManager.getDatanodeManager();
@ -890,4 +902,31 @@ public void testDatanodeLifeCycle() throws Exception {
// reports all 3 replicas // reports all 3 replicas
verifyFileLocation(fileIndex, 3); verifyFileLocation(fileIndex, 3);
} }
@Test
public void testProvidedWithHierarchicalTopology() throws Exception {
conf.setClass(ImageWriter.Options.UGI_CLASS, FsUGIResolver.class,
UGIResolver.class);
String packageName = "org.apache.hadoop.hdfs.server.blockmanagement";
String[] policies = new String[] {
"BlockPlacementPolicyDefault",
"BlockPlacementPolicyRackFaultTolerant",
"BlockPlacementPolicyWithNodeGroup",
"BlockPlacementPolicyWithUpgradeDomain"};
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
FixedBlockResolver.class);
String[] racks =
{"/pod0/rack0", "/pod0/rack0", "/pod0/rack1", "/pod0/rack1",
"/pod1/rack0", "/pod1/rack0", "/pod1/rack1", "/pod1/rack1" };
for (String policy: policies) {
LOG.info("Using policy: " + packageName + "." + policy);
conf.set(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, packageName + "." + policy);
startCluster(NNDIRPATH, racks.length,
new StorageType[]{StorageType.PROVIDED, StorageType.DISK},
null, false, racks);
verifyFileSystemContents();
setAndUnsetReplication("/" + filePrefix + (numFiles - 1) + fileSuffix);
cluster.shutdown();
}
}
} }