HDFS-12778. [READ] Report multiple locations for PROVIDED blocks

This commit is contained in:
Virajith Jalaparti 2017-11-21 14:54:57 -08:00 committed by Chris Douglas
parent 3b1d30301b
commit 3d3be87e30
3 changed files with 151 additions and 128 deletions

View File

@ -35,7 +35,6 @@
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage; import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -72,6 +71,7 @@ public class ProvidedStorageMap {
private final DatanodeStorageInfo providedStorageInfo; private final DatanodeStorageInfo providedStorageInfo;
private boolean providedEnabled; private boolean providedEnabled;
private long capacity; private long capacity;
private int defaultReplication;
ProvidedStorageMap(RwLock lock, BlockManager bm, Configuration conf) ProvidedStorageMap(RwLock lock, BlockManager bm, Configuration conf)
throws IOException { throws IOException {
@ -95,6 +95,8 @@ public class ProvidedStorageMap {
storageId, State.NORMAL, StorageType.PROVIDED); storageId, State.NORMAL, StorageType.PROVIDED);
providedDescriptor = new ProvidedDescriptor(); providedDescriptor = new ProvidedDescriptor();
providedStorageInfo = providedDescriptor.createProvidedStorage(ds); providedStorageInfo = providedDescriptor.createProvidedStorage(ds);
this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
this.bm = bm; this.bm = bm;
this.lock = lock; this.lock = lock;
@ -198,63 +200,72 @@ public void updateStorage(DatanodeDescriptor node, DatanodeStorage storage) {
*/ */
class ProvidedBlocksBuilder extends LocatedBlockBuilder { class ProvidedBlocksBuilder extends LocatedBlockBuilder {
private ShadowDatanodeInfoWithStorage pending;
private boolean hasProvidedLocations;
ProvidedBlocksBuilder(int maxBlocks) { ProvidedBlocksBuilder(int maxBlocks) {
super(maxBlocks); super(maxBlocks);
pending = new ShadowDatanodeInfoWithStorage( }
providedDescriptor, storageId);
hasProvidedLocations = false; private DatanodeDescriptor chooseProvidedDatanode(
Set<String> excludedUUids) {
DatanodeDescriptor dn = providedDescriptor.choose(null, excludedUUids);
if (dn == null) {
dn = providedDescriptor.choose(null);
}
return dn;
} }
@Override @Override
LocatedBlock newLocatedBlock(ExtendedBlock eb, LocatedBlock newLocatedBlock(ExtendedBlock eb,
DatanodeStorageInfo[] storages, long pos, boolean isCorrupt) { DatanodeStorageInfo[] storages, long pos, boolean isCorrupt) {
DatanodeInfoWithStorage[] locs = List<DatanodeInfoWithStorage> locs = new ArrayList<>();
new DatanodeInfoWithStorage[storages.length]; List<String> sids = new ArrayList<>();
String[] sids = new String[storages.length]; List<StorageType> types = new ArrayList<>();
StorageType[] types = new StorageType[storages.length]; boolean isProvidedBlock = false;
Set<String> excludedUUids = new HashSet<>();
for (int i = 0; i < storages.length; ++i) { for (int i = 0; i < storages.length; ++i) {
sids[i] = storages[i].getStorageID(); DatanodeStorageInfo currInfo = storages[i];
types[i] = storages[i].getStorageType(); StorageType storageType = currInfo.getStorageType();
if (StorageType.PROVIDED.equals(storages[i].getStorageType())) { sids.add(currInfo.getStorageID());
locs[i] = pending; types.add(storageType);
hasProvidedLocations = true; if (StorageType.PROVIDED.equals(storageType)) {
DatanodeDescriptor dn = chooseProvidedDatanode(excludedUUids);
locs.add(
new DatanodeInfoWithStorage(
dn, currInfo.getStorageID(), currInfo.getStorageType()));
excludedUUids.add(dn.getDatanodeUuid());
isProvidedBlock = true;
} else { } else {
locs[i] = new DatanodeInfoWithStorage( locs.add(new DatanodeInfoWithStorage(
storages[i].getDatanodeDescriptor(), sids[i], types[i]); currInfo.getDatanodeDescriptor(),
currInfo.getStorageID(), storageType));
excludedUUids.add(currInfo.getDatanodeDescriptor().getDatanodeUuid());
} }
} }
return new LocatedBlock(eb, locs, sids, types, pos, isCorrupt, null);
int numLocations = locs.size();
if (isProvidedBlock) {
// add more replicas until we reach the defaultReplication
for (int count = numLocations + 1;
count <= defaultReplication && count <= providedDescriptor
.activeProvidedDatanodes(); count++) {
DatanodeDescriptor dn = chooseProvidedDatanode(excludedUUids);
locs.add(new DatanodeInfoWithStorage(
dn, storageId, StorageType.PROVIDED));
sids.add(storageId);
types.add(StorageType.PROVIDED);
excludedUUids.add(dn.getDatanodeUuid());
}
}
return new LocatedBlock(eb,
locs.toArray(new DatanodeInfoWithStorage[locs.size()]),
sids.toArray(new String[sids.size()]),
types.toArray(new StorageType[types.size()]),
pos, isCorrupt, null);
} }
@Override @Override
LocatedBlocks build(DatanodeDescriptor client) { LocatedBlocks build(DatanodeDescriptor client) {
// TODO: to support multiple provided storages, need to pass/maintain map
if (hasProvidedLocations) {
// set all fields of pending DatanodeInfo
List<String> excludedUUids = new ArrayList<String>();
for (LocatedBlock b : blocks) {
DatanodeInfo[] infos = b.getLocations();
StorageType[] types = b.getStorageTypes();
for (int i = 0; i < types.length; i++) {
if (!StorageType.PROVIDED.equals(types[i])) {
excludedUUids.add(infos[i].getDatanodeUuid());
}
}
}
DatanodeDescriptor dn =
providedDescriptor.choose(client, excludedUUids);
if (dn == null) {
dn = providedDescriptor.choose(client);
}
pending.replaceInternal(dn);
}
return new LocatedBlocks( return new LocatedBlocks(
flen, isUC, blocks, last, lastComplete, feInfo, ecPolicy); flen, isUC, blocks, last, lastComplete, feInfo, ecPolicy);
} }
@ -265,53 +276,6 @@ LocatedBlocks build() {
} }
} }
/**
* An abstract {@link DatanodeInfoWithStorage} to represent provided storage.
*/
static class ShadowDatanodeInfoWithStorage extends DatanodeInfoWithStorage {
private String shadowUuid;
ShadowDatanodeInfoWithStorage(DatanodeDescriptor d, String storageId) {
super(d, storageId, StorageType.PROVIDED);
}
@Override
public String getDatanodeUuid() {
return shadowUuid;
}
public void setDatanodeUuid(String uuid) {
shadowUuid = uuid;
}
void replaceInternal(DatanodeDescriptor dn) {
updateRegInfo(dn); // overwrite DatanodeID (except UUID)
setDatanodeUuid(dn.getDatanodeUuid());
setCapacity(dn.getCapacity());
setDfsUsed(dn.getDfsUsed());
setRemaining(dn.getRemaining());
setBlockPoolUsed(dn.getBlockPoolUsed());
setCacheCapacity(dn.getCacheCapacity());
setCacheUsed(dn.getCacheUsed());
setLastUpdate(dn.getLastUpdate());
setLastUpdateMonotonic(dn.getLastUpdateMonotonic());
setXceiverCount(dn.getXceiverCount());
setNetworkLocation(dn.getNetworkLocation());
adminState = dn.getAdminState();
setUpgradeDomain(dn.getUpgradeDomain());
}
@Override
public boolean equals(Object obj) {
return super.equals(obj);
}
@Override
public int hashCode() {
return super.hashCode();
}
}
/** /**
* An abstract DatanodeDescriptor to track datanodes with provided storages. * An abstract DatanodeDescriptor to track datanodes with provided storages.
* NOTE: never resolved through registerDatanode, so not in the topology. * NOTE: never resolved through registerDatanode, so not in the topology.
@ -336,6 +300,7 @@ public static class ProvidedDescriptor extends DatanodeDescriptor {
DatanodeStorageInfo getProvidedStorage( DatanodeStorageInfo getProvidedStorage(
DatanodeDescriptor dn, DatanodeStorage s) { DatanodeDescriptor dn, DatanodeStorage s) {
LOG.info("XXXXX adding Datanode " + dn.getDatanodeUuid());
dns.put(dn.getDatanodeUuid(), dn); dns.put(dn.getDatanodeUuid(), dn);
// TODO: maintain separate RPC ident per dn // TODO: maintain separate RPC ident per dn
return storageMap.get(s.getStorageID()); return storageMap.get(s.getStorageID());
@ -352,7 +317,7 @@ DatanodeStorageInfo createProvidedStorage(DatanodeStorage ds) {
DatanodeDescriptor choose(DatanodeDescriptor client) { DatanodeDescriptor choose(DatanodeDescriptor client) {
// exact match for now // exact match for now
DatanodeDescriptor dn = client != null ? DatanodeDescriptor dn = client != null ?
dns.get(client.getDatanodeUuid()) : null; dns.get(client.getDatanodeUuid()) : null;
if (null == dn) { if (null == dn) {
dn = chooseRandom(); dn = chooseRandom();
} }
@ -360,10 +325,10 @@ DatanodeDescriptor choose(DatanodeDescriptor client) {
} }
DatanodeDescriptor choose(DatanodeDescriptor client, DatanodeDescriptor choose(DatanodeDescriptor client,
List<String> excludedUUids) { Set<String> excludedUUids) {
// exact match for now // exact match for now
DatanodeDescriptor dn = client != null ? DatanodeDescriptor dn = client != null ?
dns.get(client.getDatanodeUuid()) : null; dns.get(client.getDatanodeUuid()) : null;
if (null == dn || excludedUUids.contains(client.getDatanodeUuid())) { if (null == dn || excludedUUids.contains(client.getDatanodeUuid())) {
dn = null; dn = null;

View File

@ -34,6 +34,7 @@ public class FixedBlockResolver extends BlockResolver implements Configurable {
"hdfs.image.writer.resolver.fixed.block.size"; "hdfs.image.writer.resolver.fixed.block.size";
public static final String START_BLOCK = public static final String START_BLOCK =
"hdfs.image.writer.resolver.fixed.block.start"; "hdfs.image.writer.resolver.fixed.block.start";
public static final long BLOCKSIZE_DEFAULT = 256 * (1L << 20);
private Configuration conf; private Configuration conf;
private long blocksize = 256 * (1L << 20); private long blocksize = 256 * (1L << 20);
@ -42,7 +43,7 @@ public class FixedBlockResolver extends BlockResolver implements Configurable {
@Override @Override
public void setConf(Configuration conf) { public void setConf(Configuration conf) {
this.conf = conf; this.conf = conf;
blocksize = conf.getLong(BLOCKSIZE, 256 * (1L << 20)); blocksize = conf.getLong(BLOCKSIZE, BLOCKSIZE_DEFAULT);
blockIds.set(conf.getLong(START_BLOCK, (1L << 30))); blockIds.set(conf.getLong(START_BLOCK, (1L << 30)));
} }

View File

@ -474,12 +474,12 @@ public void testClusterWithEmptyImage() throws IOException {
} }
private DatanodeInfo[] getAndCheckBlockLocations(DFSClient client, private DatanodeInfo[] getAndCheckBlockLocations(DFSClient client,
String filename, int expectedLocations) throws IOException { String filename, long fileLen, long expectedBlocks, int expectedLocations)
LocatedBlocks locatedBlocks = client.getLocatedBlocks( throws IOException {
filename, 0, baseFileLen); LocatedBlocks locatedBlocks = client.getLocatedBlocks(filename, 0, fileLen);
//given the start and length in the above call, // given the start and length in the above call,
//only one LocatedBlock in LocatedBlocks // only one LocatedBlock in LocatedBlocks
assertEquals(1, locatedBlocks.getLocatedBlocks().size()); assertEquals(expectedBlocks, locatedBlocks.getLocatedBlocks().size());
LocatedBlock locatedBlock = locatedBlocks.getLocatedBlocks().get(0); LocatedBlock locatedBlock = locatedBlocks.getLocatedBlocks().get(0);
assertEquals(expectedLocations, locatedBlock.getLocations().length); assertEquals(expectedLocations, locatedBlock.getLocations().length);
return locatedBlock.getLocations(); return locatedBlock.getLocations();
@ -513,17 +513,20 @@ public void testSetReplicationForProvidedFiles() throws Exception {
file, newReplication, 10000); file, newReplication, 10000);
DFSClient client = new DFSClient(new InetSocketAddress("localhost", DFSClient client = new DFSClient(new InetSocketAddress("localhost",
cluster.getNameNodePort()), cluster.getConfiguration(0)); cluster.getNameNodePort()), cluster.getConfiguration(0));
getAndCheckBlockLocations(client, filename, newReplication); getAndCheckBlockLocations(client, filename, baseFileLen, 1, newReplication);
// set the replication back to 1 // set the replication back to 1
newReplication = 1; newReplication = 1;
LOG.info("Setting replication of file {} back to {}", LOG.info("Setting replication of file {} back to {}",
filename, newReplication); filename, newReplication);
fs.setReplication(file, newReplication); fs.setReplication(file, newReplication);
// defaultReplication number of replicas should be returned
int defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
DFSTestUtil.waitForReplication((DistributedFileSystem) fs, DFSTestUtil.waitForReplication((DistributedFileSystem) fs,
file, newReplication, 10000); file, (short) defaultReplication, 10000);
// the only replica left should be the PROVIDED datanode getAndCheckBlockLocations(client, filename, baseFileLen, 1,
getAndCheckBlockLocations(client, filename, newReplication); defaultReplication);
} }
@Test(timeout=30000) @Test(timeout=30000)
@ -545,8 +548,9 @@ public void testProvidedDatanodeFailures() throws Exception {
if (numFiles >= 1) { if (numFiles >= 1) {
String filename = "/" + filePrefix + (numFiles - 1) + fileSuffix; String filename = "/" + filePrefix + (numFiles - 1) + fileSuffix;
// 2 locations returned as there are 2 PROVIDED datanodes
DatanodeInfo[] dnInfos = getAndCheckBlockLocations(client, filename, 1); DatanodeInfo[] dnInfos =
getAndCheckBlockLocations(client, filename, baseFileLen, 1, 2);
//the location should be one of the provided DNs available //the location should be one of the provided DNs available
assertTrue( assertTrue(
dnInfos[0].getDatanodeUuid().equals( dnInfos[0].getDatanodeUuid().equals(
@ -564,7 +568,7 @@ public void testProvidedDatanodeFailures() throws Exception {
providedDatanode1.getDatanodeId().getXferAddr()); providedDatanode1.getDatanodeId().getXferAddr());
//should find the block on the 2nd provided datanode //should find the block on the 2nd provided datanode
dnInfos = getAndCheckBlockLocations(client, filename, 1); dnInfos = getAndCheckBlockLocations(client, filename, baseFileLen, 1, 1);
assertEquals(providedDatanode2.getDatanodeUuid(), assertEquals(providedDatanode2.getDatanodeUuid(),
dnInfos[0].getDatanodeUuid()); dnInfos[0].getDatanodeUuid());
@ -575,14 +579,14 @@ public void testProvidedDatanodeFailures() throws Exception {
BlockManagerTestUtil.noticeDeadDatanode( BlockManagerTestUtil.noticeDeadDatanode(
cluster.getNameNode(), cluster.getNameNode(),
providedDatanode2.getDatanodeId().getXferAddr()); providedDatanode2.getDatanodeId().getXferAddr());
getAndCheckBlockLocations(client, filename, 0); getAndCheckBlockLocations(client, filename, baseFileLen, 1, 0);
//restart the provided datanode //restart the provided datanode
cluster.restartDataNode(providedDNProperties1, true); cluster.restartDataNode(providedDNProperties1, true);
cluster.waitActive(); cluster.waitActive();
//should find the block on the 1st provided datanode now //should find the block on the 1st provided datanode now
dnInfos = getAndCheckBlockLocations(client, filename, 1); dnInfos = getAndCheckBlockLocations(client, filename, baseFileLen, 1, 1);
//not comparing UUIDs as the datanode can now have a different one. //not comparing UUIDs as the datanode can now have a different one.
assertEquals(providedDatanode1.getDatanodeId().getXferAddr(), assertEquals(providedDatanode1.getDatanodeId().getXferAddr(),
dnInfos[0].getXferAddr()); dnInfos[0].getXferAddr());
@ -593,20 +597,18 @@ public void testProvidedDatanodeFailures() throws Exception {
public void testTransientDeadDatanodes() throws Exception { public void testTransientDeadDatanodes() throws Exception {
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH, createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
FixedBlockResolver.class); FixedBlockResolver.class);
// 2 Datanodes, 1 PROVIDED and other DISK // 3 Datanodes, 2 PROVIDED and other DISK
startCluster(NNDIRPATH, 2, null, startCluster(NNDIRPATH, 3, null,
new StorageType[][] { new StorageType[][] {
{StorageType.PROVIDED, StorageType.DISK},
{StorageType.PROVIDED, StorageType.DISK}, {StorageType.PROVIDED, StorageType.DISK},
{StorageType.DISK}}, {StorageType.DISK}},
false); false);
DataNode providedDatanode = cluster.getDataNodes().get(0); DataNode providedDatanode = cluster.getDataNodes().get(0);
DFSClient client = new DFSClient(new InetSocketAddress("localhost",
cluster.getNameNodePort()), cluster.getConfiguration(0));
for (int i= 0; i < numFiles; i++) { for (int i= 0; i < numFiles; i++) {
verifyFileLocation(i); // expect to have 2 locations as we have 2 provided Datanodes.
verifyFileLocation(i, 2);
// NameNode thinks the datanode is down // NameNode thinks the datanode is down
BlockManagerTestUtil.noticeDeadDatanode( BlockManagerTestUtil.noticeDeadDatanode(
cluster.getNameNode(), cluster.getNameNode(),
@ -614,7 +616,7 @@ public void testTransientDeadDatanodes() throws Exception {
cluster.waitActive(); cluster.waitActive();
cluster.triggerHeartbeats(); cluster.triggerHeartbeats();
Thread.sleep(1000); Thread.sleep(1000);
verifyFileLocation(i); verifyFileLocation(i, 2);
} }
} }
@ -622,17 +624,18 @@ public void testTransientDeadDatanodes() throws Exception {
public void testNamenodeRestart() throws Exception { public void testNamenodeRestart() throws Exception {
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH, createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
FixedBlockResolver.class); FixedBlockResolver.class);
// 2 Datanodes, 1 PROVIDED and other DISK // 3 Datanodes, 2 PROVIDED and other DISK
startCluster(NNDIRPATH, 2, null, startCluster(NNDIRPATH, 3, null,
new StorageType[][] { new StorageType[][] {
{StorageType.PROVIDED, StorageType.DISK},
{StorageType.PROVIDED, StorageType.DISK}, {StorageType.PROVIDED, StorageType.DISK},
{StorageType.DISK}}, {StorageType.DISK}},
false); false);
verifyFileLocation(numFiles - 1); verifyFileLocation(numFiles - 1, 2);
cluster.restartNameNodes(); cluster.restartNameNodes();
cluster.waitActive(); cluster.waitActive();
verifyFileLocation(numFiles - 1); verifyFileLocation(numFiles - 1, 2);
} }
/** /**
@ -640,18 +643,21 @@ public void testNamenodeRestart() throws Exception {
* @param fileIndex the index of the file to verify. * @param fileIndex the index of the file to verify.
* @throws Exception * @throws Exception
*/ */
private void verifyFileLocation(int fileIndex) private void verifyFileLocation(int fileIndex, int replication)
throws Exception { throws Exception {
DataNode providedDatanode = cluster.getDataNodes().get(0);
DFSClient client = new DFSClient( DFSClient client = new DFSClient(
new InetSocketAddress("localhost", cluster.getNameNodePort()), new InetSocketAddress("localhost", cluster.getNameNodePort()),
cluster.getConfiguration(0)); cluster.getConfiguration(0));
if (fileIndex <= numFiles && fileIndex >= 0) { if (fileIndex < numFiles && fileIndex >= 0) {
String filename = "/" + filePrefix + fileIndex + fileSuffix; String filename = filePrefix + fileIndex + fileSuffix;
DatanodeInfo[] dnInfos = getAndCheckBlockLocations(client, filename, 1); File file = new File(new Path(NAMEPATH, filename).toUri());
// location should be the provided DN long fileLen = file.length();
assertEquals(providedDatanode.getDatanodeUuid(), long blockSize = conf.getLong(FixedBlockResolver.BLOCKSIZE,
dnInfos[0].getDatanodeUuid()); FixedBlockResolver.BLOCKSIZE_DEFAULT);
long numLocatedBlocks =
fileLen == 0 ? 1 : (long) Math.ceil(fileLen * 1.0 / blockSize);
getAndCheckBlockLocations(client, "/" + filename, fileLen,
numLocatedBlocks, replication);
} }
} }
@ -669,4 +675,55 @@ public void testSetClusterID() throws Exception {
NameNode nn = cluster.getNameNode(); NameNode nn = cluster.getNameNode();
assertEquals(clusterID, nn.getNamesystem().getClusterId()); assertEquals(clusterID, nn.getNamesystem().getClusterId());
} }
@Test(timeout=30000)
public void testNumberOfProvidedLocations() throws Exception {
// set default replication to 4
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 4);
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
FixedBlockResolver.class);
// start with 4 PROVIDED location
startCluster(NNDIRPATH, 4,
new StorageType[]{
StorageType.PROVIDED, StorageType.DISK},
null,
false);
int expectedLocations = 4;
for (int i = 0; i < numFiles; i++) {
verifyFileLocation(i, expectedLocations);
}
// stop 2 datanodes, one after the other and verify number of locations.
for (int i = 1; i <= 2; i++) {
DataNode dn = cluster.getDataNodes().get(0);
cluster.stopDataNode(0);
// make NameNode detect that datanode is down
BlockManagerTestUtil.noticeDeadDatanode(cluster.getNameNode(),
dn.getDatanodeId().getXferAddr());
expectedLocations = 4 - i;
for (int j = 0; j < numFiles; j++) {
verifyFileLocation(j, expectedLocations);
}
}
}
@Test(timeout=30000)
public void testNumberOfProvidedLocationsManyBlocks() throws Exception {
// increase number of blocks per file to at least 10 blocks per file
conf.setLong(FixedBlockResolver.BLOCKSIZE, baseFileLen/10);
// set default replication to 4
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 4);
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
FixedBlockResolver.class);
// start with 4 PROVIDED location
startCluster(NNDIRPATH, 4,
new StorageType[]{
StorageType.PROVIDED, StorageType.DISK},
null,
false);
int expectedLocations = 4;
for (int i = 0; i < numFiles; i++) {
verifyFileLocation(i, expectedLocations);
}
}
} }