HDFS-11634. Optimize BlockIterator when interating starts in the middle. Contributed by Konstantin V Shvachko.

This commit is contained in:
Konstantin V Shvachko 2017-04-17 15:04:06 -07:00
parent c0ca785dbb
commit 8dfcd95d58
4 changed files with 115 additions and 16 deletions

View File

@ -1372,13 +1372,9 @@ public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
if(numBlocks == 0) { if(numBlocks == 0) {
return new BlocksWithLocations(new BlockWithLocations[0]); return new BlocksWithLocations(new BlockWithLocations[0]);
} }
Iterator<BlockInfo> iter = node.getBlockIterator();
// starting from a random block // starting from a random block
int startBlock = ThreadLocalRandom.current().nextInt(numBlocks); int startBlock = ThreadLocalRandom.current().nextInt(numBlocks);
// skip blocks Iterator<BlockInfo> iter = node.getBlockIterator(startBlock);
for(int i=0; i<startBlock; i++) {
iter.next();
}
List<BlockWithLocations> results = new ArrayList<BlockWithLocations>(); List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
long totalSize = 0; long totalSize = 0;
BlockInfo curBlock; BlockInfo curBlock;

View File

@ -519,18 +519,35 @@ private static class BlockIterator implements Iterator<BlockInfo> {
private int index = 0; private int index = 0;
private final List<Iterator<BlockInfo>> iterators; private final List<Iterator<BlockInfo>> iterators;
private BlockIterator(final DatanodeStorageInfo... storages) { private BlockIterator(final int startBlock,
final DatanodeStorageInfo... storages) {
if(startBlock < 0) {
throw new IllegalArgumentException(
"Illegal value startBlock = " + startBlock);
}
List<Iterator<BlockInfo>> iterators = new ArrayList<>(); List<Iterator<BlockInfo>> iterators = new ArrayList<>();
int s = startBlock;
int sumBlocks = 0;
for (DatanodeStorageInfo e : storages) { for (DatanodeStorageInfo e : storages) {
int numBlocks = e.numBlocks();
sumBlocks += numBlocks;
if(sumBlocks <= startBlock) {
s -= numBlocks;
} else {
iterators.add(e.getBlockIterator()); iterators.add(e.getBlockIterator());
} }
}
this.iterators = Collections.unmodifiableList(iterators); this.iterators = Collections.unmodifiableList(iterators);
// skip to the storage containing startBlock
for(; s > 0 && hasNext(); s--) {
next();
}
} }
@Override @Override
public boolean hasNext() { public boolean hasNext() {
update(); update();
return !iterators.isEmpty() && iterators.get(index).hasNext(); return index < iterators.size() && iterators.get(index).hasNext();
} }
@Override @Override
@ -552,7 +569,14 @@ private void update() {
} }
Iterator<BlockInfo> getBlockIterator() { Iterator<BlockInfo> getBlockIterator() {
return new BlockIterator(getStorageInfos()); return getBlockIterator(0);
}
/**
* Get iterator, which starts iterating from the specified block.
*/
Iterator<BlockInfo> getBlockIterator(final int startBlock) {
return new BlockIterator(startBlock, getStorageInfos());
} }
void incrementPendingReplicationWithoutTargets() { void incrementPendingReplicationWithoutTargets() {

View File

@ -22,6 +22,7 @@
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
@ -38,9 +39,13 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
@ -182,12 +187,14 @@ public void testGetBlocks() throws Exception {
CONF.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, CONF.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY,
DEFAULT_BLOCK_SIZE); DEFAULT_BLOCK_SIZE);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF).numDataNodes( MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF)
REPLICATION_FACTOR).build(); .numDataNodes(REPLICATION_FACTOR)
.storagesPerDatanode(4)
.build();
try { try {
cluster.waitActive(); cluster.waitActive();
// the third block will not be visible to getBlocks // the third block will not be visible to getBlocks
long fileLen = 2 * DEFAULT_BLOCK_SIZE + 1; long fileLen = 12 * DEFAULT_BLOCK_SIZE + 1;
DFSTestUtil.createFile(cluster.getFileSystem(), new Path("/tmp.txt"), DFSTestUtil.createFile(cluster.getFileSystem(), new Path("/tmp.txt"),
fileLen, REPLICATION_FACTOR, 0L); fileLen, REPLICATION_FACTOR, 0L);
@ -195,12 +202,12 @@ public void testGetBlocks() throws Exception {
List<LocatedBlock> locatedBlocks; List<LocatedBlock> locatedBlocks;
DatanodeInfo[] dataNodes = null; DatanodeInfo[] dataNodes = null;
boolean notWritten; boolean notWritten;
do {
final DFSClient dfsclient = new DFSClient( final DFSClient dfsclient = new DFSClient(
DFSUtilClient.getNNAddress(CONF), CONF); DFSUtilClient.getNNAddress(CONF), CONF);
do {
locatedBlocks = dfsclient.getNamenode() locatedBlocks = dfsclient.getNamenode()
.getBlockLocations("/tmp.txt", 0, fileLen).getLocatedBlocks(); .getBlockLocations("/tmp.txt", 0, fileLen).getLocatedBlocks();
assertEquals(3, locatedBlocks.size()); assertEquals(13, locatedBlocks.size());
notWritten = false; notWritten = false;
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
dataNodes = locatedBlocks.get(i).getLocations(); dataNodes = locatedBlocks.get(i).getLocations();
@ -214,6 +221,7 @@ public void testGetBlocks() throws Exception {
} }
} }
} while (notWritten); } while (notWritten);
dfsclient.close();
// get RPC client to namenode // get RPC client to namenode
InetSocketAddress addr = new InetSocketAddress("localhost", InetSocketAddress addr = new InetSocketAddress("localhost",
@ -224,7 +232,7 @@ public void testGetBlocks() throws Exception {
// get blocks of size fileLen from dataNodes[0] // get blocks of size fileLen from dataNodes[0]
BlockWithLocations[] locs; BlockWithLocations[] locs;
locs = namenode.getBlocks(dataNodes[0], fileLen).getBlocks(); locs = namenode.getBlocks(dataNodes[0], fileLen).getBlocks();
assertEquals(locs.length, 2); assertEquals(locs.length, 12);
assertEquals(locs[0].getStorageIDs().length, 2); assertEquals(locs[0].getStorageIDs().length, 2);
assertEquals(locs[1].getStorageIDs().length, 2); assertEquals(locs[1].getStorageIDs().length, 2);
@ -247,6 +255,8 @@ public void testGetBlocks() throws Exception {
// get blocks of size BlockSize from a non-existent datanode // get blocks of size BlockSize from a non-existent datanode
DatanodeInfo info = DFSTestUtil.getDatanodeInfo("1.2.3.4"); DatanodeInfo info = DFSTestUtil.getDatanodeInfo("1.2.3.4");
getBlocksWithException(namenode, info, 2); getBlocksWithException(namenode, info, 2);
testBlockIterator(cluster);
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
} }
@ -264,6 +274,59 @@ private void getBlocksWithException(NamenodeProtocol namenode,
assertTrue(getException); assertTrue(getException);
} }
/**
* BlockIterator iterates over all blocks belonging to DatanodeDescriptor
* through multiple storages.
* The test verifies that BlockIterator can be set to start iterating from
* a particular starting block index.
*/
void testBlockIterator(MiniDFSCluster cluster) {
FSNamesystem ns = cluster.getNamesystem();
String dId = cluster.getDataNodes().get(0).getDatanodeUuid();
DatanodeDescriptor dnd = BlockManagerTestUtil.getDatanode(ns, dId);
DatanodeStorageInfo[] storages = dnd.getStorageInfos();
assertEquals("DataNode should have 4 storages", 4, storages.length);
Iterator<BlockInfo> dnBlockIt = null;
// check illegal start block number
try {
dnBlockIt = BlockManagerTestUtil.getBlockIterator(
cluster.getNamesystem(), dId, -1);
assertTrue("Should throw IllegalArgumentException", false);
} catch(IllegalArgumentException ei) {
// as expected
}
assertNull("Iterator should be null", dnBlockIt);
// form an array of all DataNode blocks
int numBlocks = dnd.numBlocks();
BlockInfo[] allBlocks = new BlockInfo[numBlocks];
int idx = 0;
for(DatanodeStorageInfo s : storages) {
Iterator<BlockInfo> storageBlockIt =
BlockManagerTestUtil.getBlockIterator(s);
while(storageBlockIt.hasNext()) {
allBlocks[idx++] = storageBlockIt.next();
}
}
// check iterator for every block as a starting point
for(int i = 0; i < allBlocks.length; i++) {
// create iterator starting from i
dnBlockIt = BlockManagerTestUtil.getBlockIterator(ns, dId, i);
assertTrue("Block iterator should have next block", dnBlockIt.hasNext());
// check iterator lists blocks in the desired order
for(int j = i; j < allBlocks.length; j++) {
assertEquals("Wrong block order", allBlocks[j], dnBlockIt.next());
}
}
// check start block number larger than numBlocks in the DataNode
dnBlockIt = BlockManagerTestUtil.getBlockIterator(
ns, dId, allBlocks.length + 1);
assertFalse("Iterator should not have next block", dnBlockIt.hasNext());
}
@Test @Test
public void testBlockKey() { public void testBlockKey() {
Map<Block, Long> map = new HashMap<Block, Long>(); Map<Block, Long> map = new HashMap<Block, Long>();

View File

@ -21,6 +21,7 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -55,6 +56,21 @@ public static DatanodeDescriptor getDatanode(final FSNamesystem ns,
} }
} }
public static Iterator<BlockInfo> getBlockIterator(final FSNamesystem ns,
final String storageID, final int startBlock) {
ns.readLock();
try {
DatanodeDescriptor dn =
ns.getBlockManager().getDatanodeManager().getDatanode(storageID);
return dn.getBlockIterator(startBlock);
} finally {
ns.readUnlock();
}
}
public static Iterator<BlockInfo> getBlockIterator(DatanodeStorageInfo s) {
return s.getBlockIterator();
}
/** /**
* Refresh block queue counts on the name-node. * Refresh block queue counts on the name-node.