HDFS-7647. DatanodeManager.sortLocatedBlocks sorts DatanodeInfos but not StorageIDs. (Contributed by Milan Desai)

This commit is contained in:
Arpit Agarwal 2015-02-09 12:17:40 -08:00
parent 241336ca2b
commit ab934e8594
8 changed files with 218 additions and 21 deletions

View File

@ -859,7 +859,7 @@ protected int getWeight(Node reader, Node node) {
// Start off by initializing to off rack
int weight = 2;
if (reader != null) {
if (reader == node) {
if (reader.equals(node)) {
weight = 0;
} else if (isOnSameRack(reader, node)) {
weight = 1;

View File

@ -254,7 +254,7 @@ protected int getWeight(Node reader, Node node) {
// Start off by initializing to off rack
int weight = 3;
if (reader != null) {
if (reader == node) {
if (reader.equals(node)) {
weight = 0;
} else if (isOnSameNodeGroup(reader, node)) {
weight = 1;

View File

@ -872,6 +872,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7741. Remove unnecessary synchronized in FSDataInputStream and
HdfsDataInputStream. (yliu)
HDFS-7647. DatanodeManager.sortLocatedBlocks sorts DatanodeInfos
but not StorageIDs. (Milan Desai via Arpit Agarwal)
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.security.token.Token;
import com.google.common.collect.Lists;
@ -41,11 +42,13 @@ public class LocatedBlock {
private final ExtendedBlock b;
private long offset; // offset of the first byte of the block in the file
private final DatanodeInfo[] locs;
/** Storage ID for each replica */
private final String[] storageIDs;
// Storage type for each replica, if reported.
private final StorageType[] storageTypes;
private final DatanodeInfoWithStorage[] locs;
private final boolean hasStorageIDs;
private final boolean hasStorageTypes;
/** Cached storage ID for each replica */
private String[] storageIDs;
/** Cached storage type for each replica, if reported. */
private StorageType[] storageTypes;
// corrupt flag is true if all of the replicas of a block are corrupt.
// else false. If block has few corrupt replicas, they are filtered and
// their locations are not part of this object
@ -57,7 +60,8 @@ public class LocatedBlock {
private DatanodeInfo[] cachedLocs;
// Used when there are no locations
private static final DatanodeInfo[] EMPTY_LOCS = new DatanodeInfo[0];
private static final DatanodeInfoWithStorage[] EMPTY_LOCS =
new DatanodeInfoWithStorage[0];
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) {
this(b, locs, -1, false); // startOffset is unknown
@ -94,10 +98,22 @@ public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs, String[] storageIDs,
if (locs==null) {
this.locs = EMPTY_LOCS;
} else {
this.locs = locs;
this.locs = new DatanodeInfoWithStorage[locs.length];
for(int i = 0; i < locs.length; i++) {
DatanodeInfo di = locs[i];
DatanodeInfoWithStorage storage = new DatanodeInfoWithStorage(di,
storageIDs != null ? storageIDs[i] : null,
storageTypes != null ? storageTypes[i] : null);
storage.setDependentHostNames(di.getDependentHostNames());
storage.setLevel(di.getLevel());
storage.setParent(di.getParent());
this.locs[i] = storage;
}
}
this.storageIDs = storageIDs;
this.storageTypes = storageTypes;
this.hasStorageIDs = storageIDs != null;
this.hasStorageTypes = storageTypes != null;
if (cachedLocs == null || cachedLocs.length == 0) {
this.cachedLocs = EMPTY_LOCS;
@ -118,17 +134,52 @@ public ExtendedBlock getBlock() {
return b;
}
public DatanodeInfo[] getLocations() {
/**
* Returns the locations associated with this block. The returned array is not
* expected to be modified. If it is, caller must immediately invoke
* {@link org.apache.hadoop.hdfs.protocol.LocatedBlock#invalidateCachedStorageInfo}
* to invalidate the cached Storage ID/Type arrays.
*/
public DatanodeInfoWithStorage[] getLocations() {
return locs;
}
public StorageType[] getStorageTypes() {
if(!hasStorageTypes) {
return null;
}
if(storageTypes != null) {
return storageTypes;
}
storageTypes = new StorageType[locs.length];
for(int i = 0; i < locs.length; i++) {
storageTypes[i] = locs[i].getStorageType();
}
return storageTypes;
}
public String[] getStorageIDs() {
if(!hasStorageIDs) {
return null;
}
if(storageIDs != null) {
return storageIDs;
}
storageIDs = new String[locs.length];
for(int i = 0; i < locs.length; i++) {
storageIDs[i] = locs[i].getStorageID();
}
return storageIDs;
}
/**
* Invalidates the cached StorageID and StorageType information. Must be
* called when the locations array is modified.
*/
public void invalidateCachedStorageInfo() {
storageIDs = null;
storageTypes = null;
}
public long getStartOffset() {
return offset;
@ -161,9 +212,9 @@ public void addCachedLoc(DatanodeInfo loc) {
return;
}
// Try to re-use a DatanodeInfo already in loc
for (int i=0; i<locs.length; i++) {
if (locs[i].equals(loc)) {
cachedList.add(locs[i]);
for (DatanodeInfoWithStorage di : locs) {
if (loc.equals(di)) {
cachedList.add(di);
cachedLocs = cachedList.toArray(cachedLocs);
return;
}
@ -187,10 +238,6 @@ public String toString() {
+ "; corrupt=" + corrupt
+ "; offset=" + offset
+ "; locs=" + Arrays.asList(locs)
+ "; storageIDs=" +
(storageIDs != null ? Arrays.asList(storageIDs) : null)
+ "; storageTypes=" +
(storageTypes != null ? Arrays.asList(storageTypes) : null)
+ "}";
}
}

View File

@ -391,6 +391,8 @@ public void sortLocatedBlocks(final String targethost,
}
int activeLen = lastActiveIndex + 1;
networktopology.sortByDistance(client, b.getLocations(), activeLen);
// must invalidate cache since we modified locations array
b.invalidateCachedStorageInfo();
}
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
public class DatanodeInfoWithStorage extends DatanodeInfo {
private final String storageID;
private final StorageType storageType;
public DatanodeInfoWithStorage(DatanodeInfo from, String storageID,
StorageType storageType) {
super(from);
this.storageID = storageID;
this.storageType = storageType;
}
public String getStorageID() {
return storageID;
}
public StorageType getStorageType() {
return storageType;
}
@Override
public boolean equals(Object o) {
// allows this class to be used interchangeably with DatanodeInfo
return super.equals(o);
}
@Override
public int hashCode() {
// allows this class to be used interchangeably with DatanodeInfo
return super.hashCode();
}
@Override
public String toString() {
return "DatanodeInfoWithStorage[" + super.toString() + "," + storageID +
"," + storageType + "]";
}
}

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
@ -802,6 +803,7 @@ public void testDecommissionWithOpenfile() throws IOException, InterruptedExcept
ArrayList<String> nodes = new ArrayList<String>();
ArrayList<DatanodeInfo> dnInfos = new ArrayList<DatanodeInfo>();
DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
for (DatanodeInfo datanodeInfo : dnInfos4FirstBlock) {
DatanodeInfo found = datanodeInfo;
for (DatanodeInfo dif: dnInfos4LastBlock) {
@ -811,12 +813,12 @@ public void testDecommissionWithOpenfile() throws IOException, InterruptedExcept
}
if (found != null) {
nodes.add(found.getXferAddr());
dnInfos.add(found);
dnInfos.add(dm.getDatanode(found));
}
}
//decommission one of the 3 nodes which have last block
nodes.add(dnInfos4LastBlock[0].getXferAddr());
dnInfos.add(dnInfos4LastBlock[0]);
dnInfos.add(dm.getDatanode(dnInfos4LastBlock[0]));
writeConfigFile(excludeFile, nodes);
refreshNodes(ns, conf);

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -31,13 +32,19 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.*;
public class TestDatanodeManager {
@ -210,4 +217,81 @@ public void reloadCachedMappings() {
public void reloadCachedMappings(List<String> names) {
}
}
/**
* This test creates a LocatedBlock with 5 locations, sorts the locations
* based on the network topology, and ensures the locations are still aligned
* with the storage ids and storage types.
*/
@Test
public void testSortLocatedBlocks() throws IOException {
// create the DatanodeManager which will be tested
FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
Mockito.when(fsn.hasWriteLock()).thenReturn(true);
DatanodeManager dm = new DatanodeManager(Mockito.mock(BlockManager.class),
fsn, new Configuration());
// register 5 datanodes, each with different storage ID and type
DatanodeInfo[] locs = new DatanodeInfo[5];
String[] storageIDs = new String[5];
StorageType[] storageTypes = new StorageType[]{
StorageType.ARCHIVE,
StorageType.DEFAULT,
StorageType.DISK,
StorageType.RAM_DISK,
StorageType.SSD
};
for(int i = 0; i < 5; i++) {
// register new datanode
String uuid = "UUID-"+i;
String ip = "IP-" + i;
DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid);
Mockito.when(dr.getIpAddr()).thenReturn(ip);
Mockito.when(dr.getXferAddr()).thenReturn(ip + ":9000");
Mockito.when(dr.getXferPort()).thenReturn(9000);
Mockito.when(dr.getSoftwareVersion()).thenReturn("version1");
dm.registerDatanode(dr);
// get location and storage information
locs[i] = dm.getDatanode(uuid);
storageIDs[i] = "storageID-"+i;
}
// set first 2 locations as decomissioned
locs[0].setDecommissioned();
locs[1].setDecommissioned();
// create LocatedBlock with above locations
ExtendedBlock b = new ExtendedBlock("somePoolID", 1234);
LocatedBlock block = new LocatedBlock(b, locs, storageIDs, storageTypes);
List<LocatedBlock> blocks = new ArrayList<>();
blocks.add(block);
final String targetIp = locs[4].getIpAddr();
// sort block locations
dm.sortLocatedBlocks(targetIp, blocks);
// check that storage IDs/types are aligned with datanode locs
DatanodeInfoWithStorage[] sortedLocs = block.getLocations();
storageIDs = block.getStorageIDs();
storageTypes = block.getStorageTypes();
assertThat(sortedLocs.length, is(5));
assertThat(storageIDs.length, is(5));
assertThat(storageTypes.length, is(5));
for(int i = 0; i < sortedLocs.length; i++) {
assertThat(sortedLocs[i].getStorageID(), is(storageIDs[i]));
assertThat(sortedLocs[i].getStorageType(), is(storageTypes[i]));
}
// Ensure the local node is first.
assertThat(sortedLocs[0].getIpAddr(), is(targetIp));
// Ensure the two decommissioned DNs were moved to the end.
assertThat(sortedLocs[sortedLocs.length-1].getAdminState(),
is(DatanodeInfo.AdminStates.DECOMMISSIONED));
assertThat(sortedLocs[sortedLocs.length-2].getAdminState(),
is(DatanodeInfo.AdminStates.DECOMMISSIONED));
}
}