HDFS-12809. [READ] Fix the randomized selection of locations in {{ProvidedBlocksBuilder}}.

This commit is contained in:
Virajith Jalaparti 2017-11-27 17:04:20 -08:00 committed by Chris Douglas
parent 3d3be87e30
commit 4d59dabb7f
2 changed files with 61 additions and 77 deletions

View File

@ -19,11 +19,12 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListMap;
@ -229,11 +230,8 @@ LocatedBlock newLocatedBlock(ExtendedBlock eb,
sids.add(currInfo.getStorageID());
types.add(storageType);
if (StorageType.PROVIDED.equals(storageType)) {
DatanodeDescriptor dn = chooseProvidedDatanode(excludedUUids);
locs.add(
new DatanodeInfoWithStorage(
dn, currInfo.getStorageID(), currInfo.getStorageType()));
excludedUUids.add(dn.getDatanodeUuid());
// Provided location will be added to the list of locations after
// examining all local locations.
isProvidedBlock = true;
} else {
locs.add(new DatanodeInfoWithStorage(
@ -245,11 +243,17 @@ LocatedBlock newLocatedBlock(ExtendedBlock eb,
int numLocations = locs.size();
if (isProvidedBlock) {
// add the first datanode here
DatanodeDescriptor dn = chooseProvidedDatanode(excludedUUids);
locs.add(
new DatanodeInfoWithStorage(dn, storageId, StorageType.PROVIDED));
excludedUUids.add(dn.getDatanodeUuid());
numLocations++;
// add more replicas until we reach the defaultReplication
for (int count = numLocations + 1;
count <= defaultReplication && count <= providedDescriptor
.activeProvidedDatanodes(); count++) {
DatanodeDescriptor dn = chooseProvidedDatanode(excludedUUids);
dn = chooseProvidedDatanode(excludedUUids);
locs.add(new DatanodeInfoWithStorage(
dn, storageId, StorageType.PROVIDED));
sids.add(storageId);
@ -284,6 +288,9 @@ public static class ProvidedDescriptor extends DatanodeDescriptor {
private final NavigableMap<String, DatanodeDescriptor> dns =
new ConcurrentSkipListMap<>();
// maintain a separate list of the datanodes with provided storage
// to efficiently choose Datanodes when required.
private final List<DatanodeDescriptor> dnR = new ArrayList<>();
public final static String NETWORK_LOCATION = "/REMOTE";
public final static String NAME = "PROVIDED";
@ -300,8 +307,8 @@ public static class ProvidedDescriptor extends DatanodeDescriptor {
DatanodeStorageInfo getProvidedStorage(
DatanodeDescriptor dn, DatanodeStorage s) {
LOG.info("XXXXX adding Datanode " + dn.getDatanodeUuid());
dns.put(dn.getDatanodeUuid(), dn);
dnR.add(dn);
// TODO: maintain separate RPC ident per dn
return storageMap.get(s.getStorageID());
}
@ -315,84 +322,42 @@ DatanodeStorageInfo createProvidedStorage(DatanodeStorage ds) {
}
DatanodeDescriptor choose(DatanodeDescriptor client) {
// exact match for now
DatanodeDescriptor dn = client != null ?
dns.get(client.getDatanodeUuid()) : null;
if (null == dn) {
dn = chooseRandom();
}
return dn;
return choose(client, Collections.<String>emptySet());
}
DatanodeDescriptor choose(DatanodeDescriptor client,
Set<String> excludedUUids) {
// exact match for now
DatanodeDescriptor dn = client != null ?
dns.get(client.getDatanodeUuid()) : null;
if (null == dn || excludedUUids.contains(client.getDatanodeUuid())) {
dn = null;
Set<String> exploredUUids = new HashSet<String>();
while(exploredUUids.size() < dns.size()) {
Map.Entry<String, DatanodeDescriptor> d =
dns.ceilingEntry(UUID.randomUUID().toString());
if (null == d) {
d = dns.firstEntry();
}
String uuid = d.getValue().getDatanodeUuid();
//this node has already been explored, and was not selected earlier
if (exploredUUids.contains(uuid)) {
continue;
}
exploredUUids.add(uuid);
//this node has been excluded
if (excludedUUids.contains(uuid)) {
continue;
}
return dns.get(uuid);
if (client != null && !excludedUUids.contains(client.getDatanodeUuid())) {
DatanodeDescriptor dn = dns.get(client.getDatanodeUuid());
if (dn != null) {
return dn;
}
}
return dn;
}
DatanodeDescriptor chooseRandom(DatanodeStorageInfo[] excludedStorages) {
// TODO: Currently this is not uniformly random;
// skewed toward sparse sections of the ids
Set<DatanodeDescriptor> excludedNodes =
new HashSet<DatanodeDescriptor>();
if (excludedStorages != null) {
for (int i= 0; i < excludedStorages.length; i++) {
LOG.info("Excluded: " + excludedStorages[i].getDatanodeDescriptor());
excludedNodes.add(excludedStorages[i].getDatanodeDescriptor());
Random r = new Random();
for (int i = dnR.size() - 1; i >= 0; --i) {
int pos = r.nextInt(i + 1);
DatanodeDescriptor node = dnR.get(pos);
String uuid = node.getDatanodeUuid();
if (!excludedUUids.contains(uuid)) {
return node;
}
}
Set<DatanodeDescriptor> exploredNodes = new HashSet<DatanodeDescriptor>();
while(exploredNodes.size() < dns.size()) {
Map.Entry<String, DatanodeDescriptor> d =
dns.ceilingEntry(UUID.randomUUID().toString());
if (null == d) {
d = dns.firstEntry();
}
DatanodeDescriptor node = d.getValue();
//this node has already been explored, and was not selected earlier
if (exploredNodes.contains(node)) {
continue;
}
exploredNodes.add(node);
//this node has been excluded
if (excludedNodes.contains(node)) {
continue;
}
return node;
Collections.swap(dnR, i, pos);
}
return null;
}
DatanodeDescriptor chooseRandom() {
return chooseRandom(null);
DatanodeDescriptor chooseRandom(DatanodeStorageInfo... excludedStorages) {
Set<String> excludedNodes = new HashSet<>();
if (excludedStorages != null) {
for (int i = 0; i < excludedStorages.length; i++) {
DatanodeDescriptor dn = excludedStorages[i].getDatanodeDescriptor();
String uuid = dn.getDatanodeUuid();
excludedNodes.add(uuid);
}
}
return choose(null, excludedNodes);
}
@Override
@ -414,6 +379,7 @@ int remove(DatanodeDescriptor dnToRemove) {
DatanodeDescriptor storedDN = dns.get(dnToRemove.getDatanodeUuid());
if (storedDN != null) {
dns.remove(dnToRemove.getDatanodeUuid());
dnR.remove(dnToRemove);
}
}
return dns.size();

View File

@ -27,8 +27,11 @@
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@ -480,16 +483,31 @@ private DatanodeInfo[] getAndCheckBlockLocations(DFSClient client,
// given the start and length in the above call,
// only one LocatedBlock in LocatedBlocks
assertEquals(expectedBlocks, locatedBlocks.getLocatedBlocks().size());
LocatedBlock locatedBlock = locatedBlocks.getLocatedBlocks().get(0);
assertEquals(expectedLocations, locatedBlock.getLocations().length);
return locatedBlock.getLocations();
DatanodeInfo[] locations =
locatedBlocks.getLocatedBlocks().get(0).getLocations();
assertEquals(expectedLocations, locations.length);
checkUniqueness(locations);
return locations;
}
/**
* verify that the given locations are all unique.
* @param locations
*/
private void checkUniqueness(DatanodeInfo[] locations) {
Set<String> set = new HashSet<>();
for (DatanodeInfo info: locations) {
assertFalse("All locations should be unique",
set.contains(info.getDatanodeUuid()));
set.add(info.getDatanodeUuid());
}
}
/**
* Tests setting replication of provided files.
* @throws Exception
*/
@Test(timeout=30000)
@Test(timeout=50000)
public void testSetReplicationForProvidedFiles() throws Exception {
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
FixedBlockResolver.class);