diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java index 6fec97706e..c85eb2ca52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java @@ -19,11 +19,12 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.NavigableMap; +import java.util.Random; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentSkipListMap; @@ -229,11 +230,8 @@ LocatedBlock newLocatedBlock(ExtendedBlock eb, sids.add(currInfo.getStorageID()); types.add(storageType); if (StorageType.PROVIDED.equals(storageType)) { - DatanodeDescriptor dn = chooseProvidedDatanode(excludedUUids); - locs.add( - new DatanodeInfoWithStorage( - dn, currInfo.getStorageID(), currInfo.getStorageType())); - excludedUUids.add(dn.getDatanodeUuid()); + // Provided location will be added to the list of locations after + // examining all local locations. isProvidedBlock = true; } else { locs.add(new DatanodeInfoWithStorage( @@ -245,11 +243,17 @@ LocatedBlock newLocatedBlock(ExtendedBlock eb, int numLocations = locs.size(); if (isProvidedBlock) { + // add the first datanode here + DatanodeDescriptor dn = chooseProvidedDatanode(excludedUUids); + locs.add( + new DatanodeInfoWithStorage(dn, storageId, StorageType.PROVIDED)); + excludedUUids.add(dn.getDatanodeUuid()); + numLocations++; // add more replicas until we reach the defaultReplication for (int count = numLocations + 1; count <= defaultReplication && count <= providedDescriptor .activeProvidedDatanodes(); count++) { - DatanodeDescriptor dn = chooseProvidedDatanode(excludedUUids); + dn = chooseProvidedDatanode(excludedUUids); locs.add(new DatanodeInfoWithStorage( dn, storageId, StorageType.PROVIDED)); sids.add(storageId); @@ -284,6 +288,9 @@ public static class ProvidedDescriptor extends DatanodeDescriptor { private final NavigableMap dns = new ConcurrentSkipListMap<>(); + // maintain a separate list of the datanodes with provided storage + // to efficiently choose Datanodes when required. + private final List dnR = new ArrayList<>(); public final static String NETWORK_LOCATION = "/REMOTE"; public final static String NAME = "PROVIDED"; @@ -300,8 +307,8 @@ public static class ProvidedDescriptor extends DatanodeDescriptor { DatanodeStorageInfo getProvidedStorage( DatanodeDescriptor dn, DatanodeStorage s) { - LOG.info("XXXXX adding Datanode " + dn.getDatanodeUuid()); dns.put(dn.getDatanodeUuid(), dn); + dnR.add(dn); // TODO: maintain separate RPC ident per dn return storageMap.get(s.getStorageID()); } @@ -315,84 +322,42 @@ DatanodeStorageInfo createProvidedStorage(DatanodeStorage ds) { } DatanodeDescriptor choose(DatanodeDescriptor client) { - // exact match for now - DatanodeDescriptor dn = client != null ? - dns.get(client.getDatanodeUuid()) : null; - if (null == dn) { - dn = chooseRandom(); - } - return dn; + return choose(client, Collections.emptySet()); } DatanodeDescriptor choose(DatanodeDescriptor client, Set excludedUUids) { // exact match for now - DatanodeDescriptor dn = client != null ? - dns.get(client.getDatanodeUuid()) : null; - - if (null == dn || excludedUUids.contains(client.getDatanodeUuid())) { - dn = null; - Set exploredUUids = new HashSet(); - - while(exploredUUids.size() < dns.size()) { - Map.Entry d = - dns.ceilingEntry(UUID.randomUUID().toString()); - if (null == d) { - d = dns.firstEntry(); - } - String uuid = d.getValue().getDatanodeUuid(); - //this node has already been explored, and was not selected earlier - if (exploredUUids.contains(uuid)) { - continue; - } - exploredUUids.add(uuid); - //this node has been excluded - if (excludedUUids.contains(uuid)) { - continue; - } - return dns.get(uuid); + if (client != null && !excludedUUids.contains(client.getDatanodeUuid())) { + DatanodeDescriptor dn = dns.get(client.getDatanodeUuid()); + if (dn != null) { + return dn; } } - return dn; - } - - DatanodeDescriptor chooseRandom(DatanodeStorageInfo[] excludedStorages) { - // TODO: Currently this is not uniformly random; - // skewed toward sparse sections of the ids - Set excludedNodes = - new HashSet(); - if (excludedStorages != null) { - for (int i= 0; i < excludedStorages.length; i++) { - LOG.info("Excluded: " + excludedStorages[i].getDatanodeDescriptor()); - excludedNodes.add(excludedStorages[i].getDatanodeDescriptor()); + Random r = new Random(); + for (int i = dnR.size() - 1; i >= 0; --i) { + int pos = r.nextInt(i + 1); + DatanodeDescriptor node = dnR.get(pos); + String uuid = node.getDatanodeUuid(); + if (!excludedUUids.contains(uuid)) { + return node; } - } - Set exploredNodes = new HashSet(); - - while(exploredNodes.size() < dns.size()) { - Map.Entry d = - dns.ceilingEntry(UUID.randomUUID().toString()); - if (null == d) { - d = dns.firstEntry(); - } - DatanodeDescriptor node = d.getValue(); - //this node has already been explored, and was not selected earlier - if (exploredNodes.contains(node)) { - continue; - } - exploredNodes.add(node); - //this node has been excluded - if (excludedNodes.contains(node)) { - continue; - } - return node; + Collections.swap(dnR, i, pos); } return null; } - DatanodeDescriptor chooseRandom() { - return chooseRandom(null); + DatanodeDescriptor chooseRandom(DatanodeStorageInfo... excludedStorages) { + Set excludedNodes = new HashSet<>(); + if (excludedStorages != null) { + for (int i = 0; i < excludedStorages.length; i++) { + DatanodeDescriptor dn = excludedStorages[i].getDatanodeDescriptor(); + String uuid = dn.getDatanodeUuid(); + excludedNodes.add(uuid); + } + } + return choose(null, excludedNodes); } @Override @@ -414,6 +379,7 @@ int remove(DatanodeDescriptor dnToRemove) { DatanodeDescriptor storedDN = dns.get(dnToRemove.getDatanodeUuid()); if (storedDN != null) { dns.remove(dnToRemove.getDatanodeUuid()); + dnR.remove(dnToRemove); } } return dns.size(); diff --git a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java index 9c82967310..09e8f97141 100644 --- a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java +++ b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java @@ -27,8 +27,11 @@ import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; +import java.util.HashSet; import java.util.Iterator; import java.util.Random; +import java.util.Set; + import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -480,16 +483,31 @@ private DatanodeInfo[] getAndCheckBlockLocations(DFSClient client, // given the start and length in the above call, // only one LocatedBlock in LocatedBlocks assertEquals(expectedBlocks, locatedBlocks.getLocatedBlocks().size()); - LocatedBlock locatedBlock = locatedBlocks.getLocatedBlocks().get(0); - assertEquals(expectedLocations, locatedBlock.getLocations().length); - return locatedBlock.getLocations(); + DatanodeInfo[] locations = + locatedBlocks.getLocatedBlocks().get(0).getLocations(); + assertEquals(expectedLocations, locations.length); + checkUniqueness(locations); + return locations; + } + + /** + * verify that the given locations are all unique. + * @param locations + */ + private void checkUniqueness(DatanodeInfo[] locations) { + Set set = new HashSet<>(); + for (DatanodeInfo info: locations) { + assertFalse("All locations should be unique", + set.contains(info.getDatanodeUuid())); + set.add(info.getDatanodeUuid()); + } } /** * Tests setting replication of provided files. * @throws Exception */ - @Test(timeout=30000) + @Test(timeout=50000) public void testSetReplicationForProvidedFiles() throws Exception { createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH, FixedBlockResolver.class);