HDFS-13222. Update getBlocks method to take minBlockSize in RPC calls. Contributed by Bharat Viswanadham

This commit is contained in:
Tsz-Wo Nicholas Sze 2018-03-07 11:27:53 -08:00
parent e0307e53e2
commit 88fba00caa
11 changed files with 73 additions and 41 deletions

View File

@ -86,7 +86,8 @@ public GetBlocksResponseProto getBlocks(RpcController unused,
.build();
BlocksWithLocations blocks;
try {
blocks = impl.getBlocks(dnInfo, request.getSize());
blocks = impl.getBlocks(dnInfo, request.getSize(),
request.getMinBlockSize());
} catch (IOException e) {
throw new ServiceException(e);
}

View File

@ -99,11 +99,12 @@ public Object getUnderlyingProxyObject() {
}
@Override
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
minBlockSize)
throws IOException {
GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder()
.setDatanode(PBHelperClient.convert((DatanodeID)datanode)).setSize(size)
.build();
.setMinBlockSize(minBlockSize).build();
try {
return PBHelper.convert(rpcProxy.getBlocks(NULL_CONTROLLER, req)
.getBlocks());

View File

@ -785,7 +785,7 @@ Iterator<DBlock> getBlockIterator() {
private long getBlockList() throws IOException {
final long size = Math.min(getBlocksSize, blocksToReceive);
final BlocksWithLocations newBlksLocs =
nnc.getBlocks(getDatanodeInfo(), size);
nnc.getBlocks(getDatanodeInfo(), size, getBlocksMinBlockSize);
if (LOG.isTraceEnabled()) {
LOG.trace("getBlocks(" + getDatanodeInfo() + ", "

View File

@ -162,9 +162,10 @@ AtomicLong getBytesMoved() {
}
/** @return blocks with locations. */
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
minBlockSize)
throws IOException {
return namenode.getBlocks(datanode, size);
return namenode.getBlocks(datanode, size, minBlockSize);
}
/**

View File

@ -408,13 +408,6 @@ public long getTotalECBlockGroups() {
*/
private int numBlocksPerIteration;
/**
* Minimum size that a block can be sent to Balancer through getBlocks.
* And after HDFS-8824, the small blocks are unused anyway, so there's no
* point to send them to balancer.
*/
private long getBlocksMinBlockSize = -1;
/**
* Progress of the Reconstruction queues initialisation.
*/
@ -539,9 +532,6 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
this.numBlocksPerIteration = conf.getInt(
DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT,
DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT);
this.getBlocksMinBlockSize = conf.getLongBytes(
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT);
final int minMaintenanceR = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY,
@ -1469,7 +1459,8 @@ public boolean isSufficientlyReplicated(BlockInfo b) {
/** Get all blocks with location information from a datanode. */
public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
final long size) throws UnregisteredNodeException {
final long size, final long minBlockSize) throws
UnregisteredNodeException {
final DatanodeDescriptor node = getDatanodeManager().getDatanode(datanode);
if (node == null) {
blockLog.warn("BLOCK* getBlocks: Asking for blocks from an" +
@ -1491,7 +1482,7 @@ public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
while(totalSize<size && iter.hasNext()) {
curBlock = iter.next();
if(!curBlock.isComplete()) continue;
if (curBlock.getNumBytes() < getBlocksMinBlockSize) {
if (curBlock.getNumBytes() < minBlockSize) {
continue;
}
totalSize += addBlock(curBlock, results);
@ -1501,7 +1492,7 @@ public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
for(int i=0; i<startBlock&&totalSize<size; i++) {
curBlock = iter.next();
if(!curBlock.isComplete()) continue;
if (curBlock.getNumBytes() < getBlocksMinBlockSize) {
if (curBlock.getNumBytes() < minBlockSize) {
continue;
}
totalSize += addBlock(curBlock, results);

View File

@ -1718,13 +1718,14 @@ public boolean isInStandbyState() {
* @param datanode on which blocks are located
* @param size total size of blocks
*/
public BlocksWithLocations getBlocks(DatanodeID datanode, long size)
throws IOException {
public BlocksWithLocations getBlocks(DatanodeID datanode, long size, long
minimumBlockSize) throws IOException {
checkOperation(OperationCategory.READ);
readLock();
try {
checkOperation(OperationCategory.READ);
return getBlockManager().getBlocksWithLocations(datanode, size);
return getBlockManager().getBlocksWithLocations(datanode, size,
minimumBlockSize);
} finally {
readUnlock("getBlocks");
}

View File

@ -618,15 +618,20 @@ private static UserGroupInformation getRemoteUser() throws IOException {
// NamenodeProtocol
/////////////////////////////////////////////////////
@Override // NamenodeProtocol
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
throws IOException {
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
minBlockSize)
throws IOException {
if(size <= 0) {
throw new IllegalArgumentException(
"Unexpected not positive size: "+size);
"Unexpected not positive size: "+size);
}
if(minBlockSize < 0) {
throw new IllegalArgumentException(
"Unexpected not positive size: "+size);
}
checkNNStartup();
namesystem.checkSuperuserPrivilege();
return namesystem.getBlocks(datanode, size);
return namesystem.getBlocks(datanode, size, minBlockSize);
}
@Override // NamenodeProtocol

View File

@ -67,17 +67,18 @@ public interface NamenodeProtocol {
/**
* Get a list of blocks belonging to <code>datanode</code>
* whose total size equals <code>size</code>.
*
*
* @see org.apache.hadoop.hdfs.server.balancer.Balancer
* @param datanode a data node
* @param size requested size
* @param minBlockSize each block should be of this minimum Block Size
* @return a list of blocks & their locations
* @throws IOException if size is less than or equal to 0 or
datanode does not exist
datanode does not exist
*/
@Idempotent
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
throws IOException;
BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
minBlockSize) throws IOException;
/**
* Get the current block keys

View File

@ -43,6 +43,7 @@ import "HdfsServer.proto";
message GetBlocksRequestProto {
required DatanodeIDProto datanode = 1; // Datanode ID
required uint64 size = 2; // Size in bytes
optional uint64 minBlockSize = 3 [default = 0]; // Minimum Block Size in bytes
}

View File

@ -229,32 +229,48 @@ public void testGetBlocks() throws Exception {
NamenodeProtocol namenode = NameNodeProxies.createProxy(CONF,
DFSUtilClient.getNNUri(addr), NamenodeProtocol.class).getProxy();
// get blocks of size fileLen from dataNodes[0]
// get blocks of size fileLen from dataNodes[0], with minBlockSize as
// fileLen
BlockWithLocations[] locs;
locs = namenode.getBlocks(dataNodes[0], fileLen).getBlocks();
assertEquals(locs.length, 12);
// Should return all 13 blocks, as minBlockSize is not passed
locs = namenode.getBlocks(dataNodes[0], fileLen, 0)
.getBlocks();
assertEquals(13, locs.length);
assertEquals(locs[0].getStorageIDs().length, 2);
assertEquals(locs[1].getStorageIDs().length, 2);
// Should return 12 blocks, as minBlockSize is DEFAULT_BLOCK_SIZE
locs = namenode.getBlocks(dataNodes[0], fileLen, DEFAULT_BLOCK_SIZE)
.getBlocks();
assertEquals(12, locs.length);
assertEquals(locs[0].getStorageIDs().length, 2);
assertEquals(locs[1].getStorageIDs().length, 2);
// get blocks of size BlockSize from dataNodes[0]
locs = namenode.getBlocks(dataNodes[0], DEFAULT_BLOCK_SIZE).getBlocks();
locs = namenode.getBlocks(dataNodes[0], DEFAULT_BLOCK_SIZE,
DEFAULT_BLOCK_SIZE).getBlocks();
assertEquals(locs.length, 1);
assertEquals(locs[0].getStorageIDs().length, 2);
// get blocks of size 1 from dataNodes[0]
locs = namenode.getBlocks(dataNodes[0], 1).getBlocks();
locs = namenode.getBlocks(dataNodes[0], 1, 1).getBlocks();
assertEquals(locs.length, 1);
assertEquals(locs[0].getStorageIDs().length, 2);
// get blocks of size 0 from dataNodes[0]
getBlocksWithException(namenode, dataNodes[0], 0);
getBlocksWithException(namenode, dataNodes[0], 0, 0);
// get blocks of size -1 from dataNodes[0]
getBlocksWithException(namenode, dataNodes[0], -1);
getBlocksWithException(namenode, dataNodes[0], -1, 0);
// minBlockSize is -1
getBlocksWithException(namenode, dataNodes[0], DEFAULT_BLOCK_SIZE, -1);
// get blocks of size BlockSize from a non-existent datanode
DatanodeInfo info = DFSTestUtil.getDatanodeInfo("1.2.3.4");
getBlocksWithException(namenode, info, 2);
getBlocksWithIncorrectDatanodeException(namenode, info, 2, 0);
testBlockIterator(cluster);
} finally {
@ -263,10 +279,24 @@ public void testGetBlocks() throws Exception {
}
private void getBlocksWithException(NamenodeProtocol namenode,
DatanodeInfo datanode, long size) throws IOException {
DatanodeInfo datanode, long size, long minBlockSize) throws IOException {
boolean getException = false;
try {
namenode.getBlocks(DFSTestUtil.getLocalDatanodeInfo(), 2);
namenode.getBlocks(datanode, size, minBlockSize);
} catch (RemoteException e) {
getException = true;
assertTrue(e.getClassName().contains("IllegalArgumentException"));
}
assertTrue(getException);
}
private void getBlocksWithIncorrectDatanodeException(
NamenodeProtocol namenode, DatanodeInfo datanode, long size,
long minBlockSize)
throws IOException {
boolean getException = false;
try {
namenode.getBlocks(datanode, size, minBlockSize);
} catch (RemoteException e) {
getException = true;
assertTrue(e.getClassName().contains("HadoopIllegalArgumentException"));

View File

@ -2072,7 +2072,7 @@ public BlocksWithLocations answer(InvocationOnMock invocation)
endGetBlocksTime = Time.monotonicNow();
numGetBlocksCalls++;
return blk;
}}).when(fsnSpy).getBlocks(any(DatanodeID.class), anyLong());
}}).when(fsnSpy).getBlocks(any(DatanodeID.class), anyLong(), anyLong());
}
/**