diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java index aaa5ae3f14..fc8bf52c1e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java @@ -859,7 +859,7 @@ protected int getWeight(Node reader, Node node) { // Start off by initializing to off rack int weight = 2; if (reader != null) { - if (reader == node) { + if (reader.equals(node)) { weight = 0; } else if (isOnSameRack(reader, node)) { weight = 1; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java index 13160ebba0..3de49dcbea 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java @@ -254,7 +254,7 @@ protected int getWeight(Node reader, Node node) { // Start off by initializing to off rack int weight = 3; if (reader != null) { - if (reader == node) { + if (reader.equals(node)) { weight = 0; } else if (isOnSameNodeGroup(reader, node)) { weight = 1; diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index eda3744e6f..4396e3d14a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -872,6 +872,9 @@ Release 2.7.0 - UNRELEASED HDFS-7741. Remove unnecessary synchronized in FSDataInputStream and HdfsDataInputStream. (yliu) + HDFS-7647. DatanodeManager.sortLocatedBlocks sorts DatanodeInfos + but not StorageIDs. (Milan Desai via Arpit Agarwal) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java index 30368f6edd..7fb2e30948 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.hdfs.server.protocol.DatanodeInfoWithStorage; import org.apache.hadoop.security.token.Token; import com.google.common.collect.Lists; @@ -41,11 +42,13 @@ public class LocatedBlock { private final ExtendedBlock b; private long offset; // offset of the first byte of the block in the file - private final DatanodeInfo[] locs; - /** Storage ID for each replica */ - private final String[] storageIDs; - // Storage type for each replica, if reported. - private final StorageType[] storageTypes; + private final DatanodeInfoWithStorage[] locs; + private final boolean hasStorageIDs; + private final boolean hasStorageTypes; + /** Cached storage ID for each replica */ + private String[] storageIDs; + /** Cached storage type for each replica, if reported. */ + private StorageType[] storageTypes; // corrupt flag is true if all of the replicas of a block are corrupt. // else false. If block has few corrupt replicas, they are filtered and // their locations are not part of this object @@ -57,7 +60,8 @@ public class LocatedBlock { private DatanodeInfo[] cachedLocs; // Used when there are no locations - private static final DatanodeInfo[] EMPTY_LOCS = new DatanodeInfo[0]; + private static final DatanodeInfoWithStorage[] EMPTY_LOCS = + new DatanodeInfoWithStorage[0]; public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) { this(b, locs, -1, false); // startOffset is unknown @@ -94,10 +98,22 @@ public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, String[] storageIDs, if (locs==null) { this.locs = EMPTY_LOCS; } else { - this.locs = locs; + this.locs = new DatanodeInfoWithStorage[locs.length]; + for(int i = 0; i < locs.length; i++) { + DatanodeInfo di = locs[i]; + DatanodeInfoWithStorage storage = new DatanodeInfoWithStorage(di, + storageIDs != null ? storageIDs[i] : null, + storageTypes != null ? storageTypes[i] : null); + storage.setDependentHostNames(di.getDependentHostNames()); + storage.setLevel(di.getLevel()); + storage.setParent(di.getParent()); + this.locs[i] = storage; + } } this.storageIDs = storageIDs; this.storageTypes = storageTypes; + this.hasStorageIDs = storageIDs != null; + this.hasStorageTypes = storageTypes != null; if (cachedLocs == null || cachedLocs.length == 0) { this.cachedLocs = EMPTY_LOCS; @@ -118,18 +134,53 @@ public ExtendedBlock getBlock() { return b; } - public DatanodeInfo[] getLocations() { + /** + * Returns the locations associated with this block. The returned array is not + * expected to be modified. If it is, caller must immediately invoke + * {@link org.apache.hadoop.hdfs.protocol.LocatedBlock#invalidateCachedStorageInfo} + * to invalidate the cached Storage ID/Type arrays. + */ + public DatanodeInfoWithStorage[] getLocations() { return locs; } public StorageType[] getStorageTypes() { + if(!hasStorageTypes) { + return null; + } + if(storageTypes != null) { + return storageTypes; + } + storageTypes = new StorageType[locs.length]; + for(int i = 0; i < locs.length; i++) { + storageTypes[i] = locs[i].getStorageType(); + } return storageTypes; } public String[] getStorageIDs() { + if(!hasStorageIDs) { + return null; + } + if(storageIDs != null) { + return storageIDs; + } + storageIDs = new String[locs.length]; + for(int i = 0; i < locs.length; i++) { + storageIDs[i] = locs[i].getStorageID(); + } return storageIDs; } + /** + * Invalidates the cached StorageID and StorageType information. Must be + * called when the locations array is modified. + */ + public void invalidateCachedStorageInfo() { + storageIDs = null; + storageTypes = null; + } + public long getStartOffset() { return offset; } @@ -161,9 +212,9 @@ public void addCachedLoc(DatanodeInfo loc) { return; } // Try to re-use a DatanodeInfo already in loc - for (int i=0; i nodes = new ArrayList(); ArrayList dnInfos = new ArrayList(); - + + DatanodeManager dm = ns.getBlockManager().getDatanodeManager(); for (DatanodeInfo datanodeInfo : dnInfos4FirstBlock) { DatanodeInfo found = datanodeInfo; for (DatanodeInfo dif: dnInfos4LastBlock) { if (datanodeInfo.equals(dif)) { - found = null; + found = null; } } if (found != null) { nodes.add(found.getXferAddr()); - dnInfos.add(found); + dnInfos.add(dm.getDatanode(found)); } } //decommission one of the 3 nodes which have last block nodes.add(dnInfos4LastBlock[0].getXferAddr()); - dnInfos.add(dnInfos4LastBlock[0]); + dnInfos.add(dm.getDatanode(dnInfos4LastBlock[0])); writeConfigFile(excludeFile, nodes); refreshNodes(ns, conf); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java index 2c65fff8b7..adf31a0408 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -31,13 +32,19 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.protocol.DatanodeInfoWithStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.net.DNSToSwitchMapping; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import static org.hamcrest.core.Is.is; import static org.junit.Assert.*; public class TestDatanodeManager { @@ -210,4 +217,81 @@ public void reloadCachedMappings() { public void reloadCachedMappings(List names) { } } + + /** + * This test creates a LocatedBlock with 5 locations, sorts the locations + * based on the network topology, and ensures the locations are still aligned + * with the storage ids and storage types. + */ + @Test + public void testSortLocatedBlocks() throws IOException { + // create the DatanodeManager which will be tested + FSNamesystem fsn = Mockito.mock(FSNamesystem.class); + Mockito.when(fsn.hasWriteLock()).thenReturn(true); + DatanodeManager dm = new DatanodeManager(Mockito.mock(BlockManager.class), + fsn, new Configuration()); + + // register 5 datanodes, each with different storage ID and type + DatanodeInfo[] locs = new DatanodeInfo[5]; + String[] storageIDs = new String[5]; + StorageType[] storageTypes = new StorageType[]{ + StorageType.ARCHIVE, + StorageType.DEFAULT, + StorageType.DISK, + StorageType.RAM_DISK, + StorageType.SSD + }; + for(int i = 0; i < 5; i++) { + // register new datanode + String uuid = "UUID-"+i; + String ip = "IP-" + i; + DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class); + Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid); + Mockito.when(dr.getIpAddr()).thenReturn(ip); + Mockito.when(dr.getXferAddr()).thenReturn(ip + ":9000"); + Mockito.when(dr.getXferPort()).thenReturn(9000); + Mockito.when(dr.getSoftwareVersion()).thenReturn("version1"); + dm.registerDatanode(dr); + + // get location and storage information + locs[i] = dm.getDatanode(uuid); + storageIDs[i] = "storageID-"+i; + } + + // set first 2 locations as decomissioned + locs[0].setDecommissioned(); + locs[1].setDecommissioned(); + + // create LocatedBlock with above locations + ExtendedBlock b = new ExtendedBlock("somePoolID", 1234); + LocatedBlock block = new LocatedBlock(b, locs, storageIDs, storageTypes); + List blocks = new ArrayList<>(); + blocks.add(block); + + final String targetIp = locs[4].getIpAddr(); + + // sort block locations + dm.sortLocatedBlocks(targetIp, blocks); + + // check that storage IDs/types are aligned with datanode locs + DatanodeInfoWithStorage[] sortedLocs = block.getLocations(); + storageIDs = block.getStorageIDs(); + storageTypes = block.getStorageTypes(); + assertThat(sortedLocs.length, is(5)); + assertThat(storageIDs.length, is(5)); + assertThat(storageTypes.length, is(5)); + for(int i = 0; i < sortedLocs.length; i++) { + assertThat(sortedLocs[i].getStorageID(), is(storageIDs[i])); + assertThat(sortedLocs[i].getStorageType(), is(storageTypes[i])); + } + + // Ensure the local node is first. + assertThat(sortedLocs[0].getIpAddr(), is(targetIp)); + + // Ensure the two decommissioned DNs were moved to the end. + assertThat(sortedLocs[sortedLocs.length-1].getAdminState(), + is(DatanodeInfo.AdminStates.DECOMMISSIONED)); + assertThat(sortedLocs[sortedLocs.length-2].getAdminState(), + is(DatanodeInfo.AdminStates.DECOMMISSIONED)); + } }