HDFS-17243. Add the parameter storage type for getBlocks method (#6238). Contributed by Haiyang Hu.

Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org>
Reviewed-by: Tao Li <tomscut@apache.org>
Signed-off-by: Shuyan Zhang <zhangshuyan@apache.org>
This commit is contained in:
huhaiyang 2023-11-06 11:20:25 +08:00 committed by GitHub
parent c15fd3b2c0
commit 4ef2322b6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 142 additions and 36 deletions

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@ -53,7 +54,7 @@ public class RouterNamenodeProtocol implements NamenodeProtocol {
@Override
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
long minBlockSize, long hotBlockTimeInterval) throws IOException {
long minBlockSize, long hotBlockTimeInterval, StorageType storageType) throws IOException {
rpcServer.checkOperation(OperationCategory.READ);
// Get the namespace where the datanode is located
@ -79,8 +80,8 @@ public class RouterNamenodeProtocol implements NamenodeProtocol {
if (nsId != null) {
RemoteMethod method = new RemoteMethod(
NamenodeProtocol.class, "getBlocks", new Class<?>[]
{DatanodeInfo.class, long.class, long.class, long.class},
datanode, size, minBlockSize, hotBlockTimeInterval);
{DatanodeInfo.class, long.class, long.class, long.class, StorageType.class},
datanode, size, minBlockSize, hotBlockTimeInterval, storageType);
return rpcClient.invokeSingle(nsId, method, BlocksWithLocations.class);
}
return null;

View File

@ -1612,9 +1612,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
@Override // NamenodeProtocol
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size,
long minBlockSize, long hotBlockTimeInterval) throws IOException {
long minBlockSize, long hotBlockTimeInterval, StorageType storageType) throws IOException {
return nnProto.getBlocks(datanode, size, minBlockSize,
hotBlockTimeInterval);
hotBlockTimeInterval, storageType);
}
@Override // NamenodeProtocol

View File

@ -1385,9 +1385,11 @@ public class TestRouterRpc {
// Verify that checking that datanode works
BlocksWithLocations routerBlockLocations =
routerNamenodeProtocol.getBlocks(dn0, 1024, 0, 0);
routerNamenodeProtocol.getBlocks(dn0, 1024, 0, 0,
null);
BlocksWithLocations nnBlockLocations =
nnNamenodeProtocol.getBlocks(dn0, 1024, 0, 0);
nnNamenodeProtocol.getBlocks(dn0, 1024, 0, 0,
null);
BlockWithLocations[] routerBlocks = routerBlockLocations.getBlocks();
BlockWithLocations[] nnBlocks = nnBlockLocations.getBlocks();
assertEquals(nnBlocks.length, routerBlocks.length);

View File

@ -89,7 +89,9 @@ public class NamenodeProtocolServerSideTranslatorPB implements
BlocksWithLocations blocks;
try {
blocks = impl.getBlocks(dnInfo, request.getSize(),
request.getMinBlockSize(), request.getTimeInterval());
request.getMinBlockSize(), request.getTimeInterval(),
request.hasStorageType() ?
PBHelperClient.convertStorageType(request.getStorageType()): null);
} catch (IOException e) {
throw new ServiceException(e);
}

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeCommandProto;
@ -101,11 +102,15 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
@Override
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
minBlockSize, long timeInterval)
minBlockSize, long timeInterval, StorageType storageType)
throws IOException {
GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder()
GetBlocksRequestProto.Builder builder = GetBlocksRequestProto.newBuilder()
.setDatanode(PBHelperClient.convert((DatanodeID)datanode)).setSize(size)
.setMinBlockSize(minBlockSize).setTimeInterval(timeInterval).build();
.setMinBlockSize(minBlockSize).setTimeInterval(timeInterval);
if (storageType != null) {
builder.setStorageType(PBHelperClient.convertStorageType(storageType));
}
GetBlocksRequestProto req = builder.build();
return PBHelper.convert(ipc(() -> rpcProxy.getBlocks(NULL_CONTROLLER, req)
.getBlocks()));
}

View File

@ -839,7 +839,7 @@ public class Dispatcher {
final long size = Math.min(getBlocksSize, blocksToReceive);
final BlocksWithLocations newBlksLocs =
nnc.getBlocks(getDatanodeInfo(), size, getBlocksMinBlockSize,
hotBlockTimeInterval);
hotBlockTimeInterval, storageType);
if (LOG.isTraceEnabled()) {
LOG.trace("getBlocks(" + getDatanodeInfo() + ", "

View File

@ -32,6 +32,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.RateLimiter;
import org.apache.hadoop.ha.HAServiceProtocol;
@ -255,7 +256,7 @@ public class NameNodeConnector implements Closeable {
/** @return blocks with locations. */
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
minBlockSize, long timeInterval) throws IOException {
minBlockSize, long timeInterval, StorageType storageType) throws IOException {
if (getBlocksRateLimiter != null) {
getBlocksRateLimiter.acquire();
}
@ -274,7 +275,7 @@ public class NameNodeConnector implements Closeable {
} else {
nnProxy = namenode;
}
return nnProxy.getBlocks(datanode, size, minBlockSize, timeInterval);
return nnProxy.getBlocks(datanode, size, minBlockSize, timeInterval, storageType);
} finally {
if (isRequestStandby) {
LOG.info("Request #getBlocks to Standby NameNode success. " +

View File

@ -1720,8 +1720,8 @@ public class BlockManager implements BlockStatsMXBean {
/** Get all blocks with location information from a datanode. */
public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
final long size, final long minBlockSize, final long timeInterval) throws
UnregisteredNodeException {
final long size, final long minBlockSize, final long timeInterval,
final StorageType storageType) throws UnregisteredNodeException {
final DatanodeDescriptor node = getDatanodeManager().getDatanode(datanode);
if (node == null) {
blockLog.warn("BLOCK* getBlocks: Asking for blocks from an" +
@ -1735,10 +1735,11 @@ public class BlockManager implements BlockStatsMXBean {
return new BlocksWithLocations(new BlockWithLocations[0]);
}
// skip stale storage
// skip stale storage, then choose specific storage type.
DatanodeStorageInfo[] storageInfos = Arrays
.stream(node.getStorageInfos())
.filter(s -> !s.areBlockContentsStale())
.filter(s -> storageType == null || s.getStorageType().equals(storageType))
.toArray(DatanodeStorageInfo[]::new);
// starting from a random block

View File

@ -1946,10 +1946,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
*
* @param datanode on which blocks are located
* @param size total size of blocks
* @param minimumBlockSize
* @param minimumBlockSize each block should be of this minimum Block Size
* @param timeInterval prefer to get blocks which are belong to
* the cold files accessed before the time interval
* @param storageType the given storage type {@link StorageType}
*/
public BlocksWithLocations getBlocks(DatanodeID datanode, long size, long
minimumBlockSize, long timeInterval) throws IOException {
minimumBlockSize, long timeInterval, StorageType storageType) throws IOException {
OperationCategory checkOp =
isGetBlocksCheckOperationEnabled ? OperationCategory.READ :
OperationCategory.UNCHECKED;
@ -1958,7 +1961,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
try {
checkOperation(checkOp);
return getBlockManager().getBlocksWithLocations(datanode, size,
minimumBlockSize, timeInterval);
minimumBlockSize, timeInterval, storageType);
} finally {
readUnlock("getBlocks");
}

View File

@ -649,7 +649,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
/////////////////////////////////////////////////////
@Override // NamenodeProtocol
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
minBlockSize, long timeInterval)
minBlockSize, long timeInterval, StorageType storageType)
throws IOException {
String operationName = "getBlocks";
if(size <= 0) {
@ -663,7 +663,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
checkNNStartup();
namesystem.checkSuperuserPrivilege(operationName);
namesystem.checkNameNodeSafeMode("Cannot execute getBlocks");
return namesystem.getBlocks(datanode, size, minBlockSize, timeInterval);
return namesystem.getBlocks(datanode, size, minBlockSize, timeInterval, storageType);
}
@Override // NamenodeProtocol

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.protocol;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@ -76,6 +77,7 @@ public interface NamenodeProtocol {
* @param minBlockSize each block should be of this minimum Block Size
* @param hotBlockTimeInterval prefer to get blocks which are belong to
* the cold files accessed before the time interval
* @param storageType the given storage type {@link StorageType}
* @return BlocksWithLocations a list of blocks &amp; their locations
* @throws IOException if size is less than or equal to 0 or
datanode does not exist
@ -83,7 +85,7 @@ public interface NamenodeProtocol {
@Idempotent
@ReadOnly
BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
minBlockSize, long hotBlockTimeInterval) throws IOException;
minBlockSize, long hotBlockTimeInterval, StorageType storageType) throws IOException;
/**
* Get the current block keys

View File

@ -48,6 +48,7 @@ message GetBlocksRequestProto {
// For more info refer HDFS-13356
optional uint64 minBlockSize = 3 [default = 10485760];
optional uint64 timeInterval = 4 [default = 0];
optional StorageTypeProto storageType = 5;
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
import static org.junit.Assert.*;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@ -36,6 +37,7 @@ import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.SafeModeAction;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@ -239,26 +241,29 @@ public class TestGetBlocks {
DFSUtilClient.getNNUri(addr), NamenodeProtocol.class).getProxy();
// Should return all 13 blocks, as minBlockSize is not passed
locs = namenode.getBlocks(dataNodes[0], fileLen, 0, 0).getBlocks();
locs = namenode.getBlocks(dataNodes[0], fileLen, 0, 0,
null).getBlocks();
assertEquals(blkLocsSize, locs.length);
assertEquals(locs[0].getStorageIDs().length, replicationFactor);
assertEquals(locs[1].getStorageIDs().length, replicationFactor);
// Should return 12 blocks, as minBlockSize is blkSize
locs = namenode.getBlocks(dataNodes[0], fileLen, blkSize, 0).getBlocks();
locs = namenode.getBlocks(dataNodes[0], fileLen, blkSize, 0,
null).getBlocks();
assertEquals(blkLocsSize - 1, locs.length);
assertEquals(locs[0].getStorageIDs().length, replicationFactor);
assertEquals(locs[1].getStorageIDs().length, replicationFactor);
// get blocks of size BlockSize from dataNodes[0]
locs = namenode.getBlocks(dataNodes[0], blkSize,
blkSize, 0).getBlocks();
blkSize, 0, null).getBlocks();
assertEquals(locs.length, 1);
assertEquals(locs[0].getStorageIDs().length, replicationFactor);
// get blocks of size 1 from dataNodes[0]
locs = namenode.getBlocks(dataNodes[0], 1, 1, 0).getBlocks();
locs = namenode.getBlocks(dataNodes[0], 1, 1, 0,
null).getBlocks();
assertEquals(locs.length, 1);
assertEquals(locs[0].getStorageIDs().length, replicationFactor);
@ -283,7 +288,8 @@ public class TestGetBlocks {
// Namenode should refuse to provide block locations to the balancer
// while in safemode.
locs = namenode.getBlocks(dataNodes[0], fileLen, 0, 0).getBlocks();
locs = namenode.getBlocks(dataNodes[0], fileLen, 0, 0,
null).getBlocks();
assertEquals(blkLocsSize, locs.length);
assertFalse(fs.isInSafeMode());
LOG.info("Entering safe mode");
@ -310,7 +316,8 @@ public class TestGetBlocks {
// Namenode should refuse should fail
LambdaTestUtils.intercept(exClass,
msg, () -> namenode.getBlocks(datanode, size, minBlkSize, 0));
msg, () -> namenode.getBlocks(datanode, size, minBlkSize, 0,
null));
}
/**
@ -450,7 +457,7 @@ public class TestGetBlocks {
.getBlockLocations(fileNew, 0, fileLen).getLocatedBlocks();
BlockWithLocations[] locsAll = namenode.getBlocks(
dataNodes[0], fileLen*2, 0, hotInterval).getBlocks();
dataNodes[0], fileLen*2, 0, hotInterval, null).getBlocks();
assertEquals(locsAll.length, 4);
for(int i = 0; i < blockNum; i++) {
@ -461,7 +468,7 @@ public class TestGetBlocks {
}
BlockWithLocations[] locs2 = namenode.getBlocks(
dataNodes[0], fileLen*2, 0, hotInterval).getBlocks();
dataNodes[0], fileLen*2, 0, hotInterval, null).getBlocks();
for(int i = 0; i < 2; i++) {
assertTrue(belongToFile(locs2[i], locatedBlocksOld));
}
@ -508,7 +515,7 @@ public class TestGetBlocks {
// check blocks count equals to blockNum
BlockWithLocations[] blocks = namenode.getBlocks(
dataNodes[0], fileLen*2, 0, 0).getBlocks();
dataNodes[0], fileLen*2, 0, 0, null).getBlocks();
assertEquals(blockNum, blocks.length);
// calculate the block count on storage[0]
@ -524,13 +531,94 @@ public class TestGetBlocks {
// set storage[0] stale
storageInfos[0].setBlockContentsStale(true);
blocks = namenode.getBlocks(
dataNodes[0], fileLen*2, 0, 0).getBlocks();
dataNodes[0], fileLen*2, 0, 0, null).getBlocks();
assertEquals(blockNum - count, blocks.length);
// set all storage stale
bm0.getDatanodeManager().markAllDatanodesStale();
blocks = namenode.getBlocks(
dataNodes[0], fileLen*2, 0, 0).getBlocks();
dataNodes[0], fileLen*2, 0, 0, null).getBlocks();
assertEquals(0, blocks.length);
}
@Test
public void testChooseSpecifyStorageType() throws Exception {
final short repFactor = (short) 1;
final int fileLen = BLOCK_SIZE;
final Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.storageTypes(new StorageType[] {StorageType.DISK, StorageType.SSD}).
storagesPerDatanode(2).build()) {
cluster.waitActive();
// Get storage info.
ClientProtocol client = NameNodeProxies.createProxy(conf,
cluster.getFileSystem(0).getUri(),
ClientProtocol.class).getProxy();
DatanodeInfo[] dataNodes = client.getDatanodeReport(DatanodeReportType.ALL);
BlockManager bm0 = cluster.getNamesystem(0).getBlockManager();
DatanodeStorageInfo[] storageInfos = bm0.getDatanodeManager()
.getDatanode(dataNodes[0].getDatanodeUuid()).getStorageInfos();
assert Arrays.stream(storageInfos)
.anyMatch(datanodeStorageInfo -> {
String storageTypeName = datanodeStorageInfo.getStorageType().name();
return storageTypeName.equals("SSD") || storageTypeName.equals("DISK");
}) : "No 'SSD' or 'DISK' storage types found.";
// Create hdfs file.
Path ssdDir = new Path("/testChooseSSD");
DistributedFileSystem fs = cluster.getFileSystem();
Path ssdFile = new Path(ssdDir, "file");
fs.mkdirs(ssdDir);
fs.setStoragePolicy(ssdDir, "ALL_SSD");
DFSTestUtil.createFile(fs, ssdFile, false, 1024, fileLen,
BLOCK_SIZE, repFactor, 0, true);
DFSTestUtil.waitReplication(fs, ssdFile, repFactor);
BlockLocation[] locations = fs.getClient()
.getBlockLocations(ssdFile.toUri().getPath(), 0, Long.MAX_VALUE);
assertEquals(1, locations.length);
assertEquals("SSD", locations[0].getStorageTypes()[0].name());
Path diskDir = new Path("/testChooseDisk");
fs = cluster.getFileSystem();
Path diskFile = new Path(diskDir, "file");
fs.mkdirs(diskDir);
fs.setStoragePolicy(diskDir, "HOT");
DFSTestUtil.createFile(fs, diskFile, false, 1024, fileLen,
BLOCK_SIZE, repFactor, 0, true);
DFSTestUtil.waitReplication(fs, diskFile, repFactor);
locations = fs.getClient()
.getBlockLocations(diskFile.toUri().getPath(), 0, Long.MAX_VALUE);
assertEquals(1, locations.length);
assertEquals("DISK", locations[0].getStorageTypes()[0].name());
InetSocketAddress addr = new InetSocketAddress("localhost",
cluster.getNameNodePort());
NamenodeProtocol namenode = NameNodeProxies.createProxy(conf,
DFSUtilClient.getNNUri(addr), NamenodeProtocol.class).getProxy();
// Check blocks count equals to blockNum.
// If StorageType is not specified will get all blocks.
BlockWithLocations[] blocks = namenode.getBlocks(
dataNodes[0], fileLen * 2, 0, 0,
null).getBlocks();
assertEquals(2, blocks.length);
// Check the count of blocks with a StorageType of DISK.
blocks = namenode.getBlocks(
dataNodes[0], fileLen * 2, 0, 0,
StorageType.DISK).getBlocks();
assertEquals(1, blocks.length);
assertEquals("DISK", blocks[0].getStorageTypes()[0].name());
// Check the count of blocks with a StorageType of SSD.
blocks = namenode.getBlocks(
dataNodes[0], fileLen * 2, 0, 0,
StorageType.SSD).getBlocks();
assertEquals(1, blocks.length);
assertEquals("SSD", blocks[0].getStorageTypes()[0].name());
}
}
}

View File

@ -1891,7 +1891,7 @@ public class TestBalancer {
numGetBlocksCalls.incrementAndGet();
return blk;
}}).when(fsnSpy).getBlocks(any(DatanodeID.class),
anyLong(), anyLong(), anyLong());
anyLong(), anyLong(), anyLong(), any(StorageType.class));
}
/**

View File

@ -277,7 +277,7 @@ public class TestBalancerWithHANameNodes {
int expectedObserverIdx = withObserverFailure ? 3 : 2;
int expectedCount = (i == expectedObserverIdx) ? 2 : 0;
verify(namesystemSpies.get(i), times(expectedCount))
.getBlocks(any(), anyLong(), anyLong(), anyLong());
.getBlocks(any(), anyLong(), anyLong(), anyLong(), any());
}
} finally {
if (qjmhaCluster != null) {