HDFS-13540. DFSStripedInputStream should only allocate new buffers when reading. Contributed by Xiao Chen.

This commit is contained in:
Sammi Chen 2018-05-23 19:10:09 +08:00
parent fed2bef647
commit 34e8b9f9a8
3 changed files with 64 additions and 5 deletions

View File

@ -116,4 +116,16 @@ public synchronized void putBuffer(ByteBuffer buffer) {
// poor granularity.
}
}
/**
* Get the size of the buffer pool, for the specified buffer type.
*
* @param direct Whether the size is returned for direct buffers
* @return The size
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public int size(boolean direct) {
return getBufferTree(direct).size();
}
}

View File

@ -116,12 +116,14 @@ private boolean useDirectBuffer() {
return decoder.preferDirectBuffer();
}
void resetCurStripeBuffer() {
if (curStripeBuf == null) {
private void resetCurStripeBuffer(boolean shouldAllocateBuf) {
if (shouldAllocateBuf && curStripeBuf == null) {
curStripeBuf = BUFFER_POOL.getBuffer(useDirectBuffer(),
cellSize * dataBlkNum);
}
if (curStripeBuf != null) {
curStripeBuf.clear();
}
curStripeRange = new StripeRange(0, 0);
}
@ -206,7 +208,7 @@ public synchronized void close() throws IOException {
*/
@Override
protected void closeCurrentBlockReaders() {
resetCurStripeBuffer();
resetCurStripeBuffer(false);
if (blockReaders == null || blockReaders.length == 0) {
return;
}
@ -296,7 +298,7 @@ boolean createBlockReader(LocatedBlock block, long offsetInBlock,
*/
private void readOneStripe(CorruptedBlocks corruptedBlocks)
throws IOException {
resetCurStripeBuffer();
resetCurStripeBuffer(true);
// compute stripe range based on pos
final long offsetInBlockGroup = getOffsetInBlockGroup();

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
@ -529,4 +530,48 @@ public void testReadFailToGetCurrentBlock() throws Exception {
}
}
}
@Test
public void testCloseDoesNotAllocateNewBuffer() throws Exception {
final int numBlocks = 2;
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
stripesPerBlock, false, ecPolicy);
try (DFSInputStream in = fs.getClient().open(filePath.toString())) {
assertTrue(in instanceof DFSStripedInputStream);
final DFSStripedInputStream stream = (DFSStripedInputStream) in;
final ElasticByteBufferPool ebbp =
(ElasticByteBufferPool) stream.getBufferPool();
// first clear existing pool
LOG.info("Current pool size: direct: " + ebbp.size(true) + ", indirect: "
+ ebbp.size(false));
emptyBufferPoolForCurrentPolicy(ebbp, true);
emptyBufferPoolForCurrentPolicy(ebbp, false);
final int startSizeDirect = ebbp.size(true);
final int startSizeIndirect = ebbp.size(false);
// close should not allocate new buffers in the pool.
stream.close();
assertEquals(startSizeDirect, ebbp.size(true));
assertEquals(startSizeIndirect, ebbp.size(false));
}
}
/**
* Empties the pool for the specified buffer type, for the current ecPolicy.
* <p>
* Note that {@link #ecPolicy} may change for difference test cases in
* {@link TestDFSStripedInputStreamWithRandomECPolicy}.
*/
private void emptyBufferPoolForCurrentPolicy(ElasticByteBufferPool ebbp,
boolean direct) {
int size;
while ((size = ebbp.size(direct)) != 0) {
ebbp.getBuffer(direct,
ecPolicy.getCellSize() * ecPolicy.getNumDataUnits());
if (size == ebbp.size(direct)) {
// if getBuffer didn't decrease size, it means the pool for the buffer
// corresponding to current ecPolicy is empty
break;
}
}
}
}