HDFS-14308. DFSStripedInputStream curStripeBuf is not freed by unbuffer() (#1667)
Reviewed-by: Aravindan Vijayan <avijayan@cloudera.com> Reviewed-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
parent
eef34f2d87
commit
30db895b59
@ -70,7 +70,8 @@ public class DFSStripedInputStream extends DFSInputStream {
|
|||||||
private final int groupSize;
|
private final int groupSize;
|
||||||
/** the buffer for a complete stripe. */
|
/** the buffer for a complete stripe. */
|
||||||
private ByteBuffer curStripeBuf;
|
private ByteBuffer curStripeBuf;
|
||||||
private ByteBuffer parityBuf;
|
@VisibleForTesting
|
||||||
|
protected ByteBuffer parityBuf;
|
||||||
private final ErasureCodingPolicy ecPolicy;
|
private final ErasureCodingPolicy ecPolicy;
|
||||||
private RawErasureDecoder decoder;
|
private RawErasureDecoder decoder;
|
||||||
|
|
||||||
@ -129,7 +130,7 @@ private void resetCurStripeBuffer(boolean shouldAllocateBuf) {
|
|||||||
curStripeRange = new StripeRange(0, 0);
|
curStripeRange = new StripeRange(0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ByteBuffer getParityBuffer() {
|
protected synchronized ByteBuffer getParityBuffer() {
|
||||||
if (parityBuf == null) {
|
if (parityBuf == null) {
|
||||||
parityBuf = BUFFER_POOL.getBuffer(useDirectBuffer(),
|
parityBuf = BUFFER_POOL.getBuffer(useDirectBuffer(),
|
||||||
cellSize * parityBlkNum);
|
cellSize * parityBlkNum);
|
||||||
@ -554,4 +555,17 @@ public synchronized void releaseBuffer(ByteBuffer buffer) {
|
|||||||
throw new UnsupportedOperationException(
|
throw new UnsupportedOperationException(
|
||||||
"Not support enhanced byte buffer access.");
|
"Not support enhanced byte buffer access.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void unbuffer() {
|
||||||
|
super.unbuffer();
|
||||||
|
if (curStripeBuf != null) {
|
||||||
|
BUFFER_POOL.putBuffer(curStripeBuf);
|
||||||
|
curStripeBuf = null;
|
||||||
|
}
|
||||||
|
if (parityBuf != null) {
|
||||||
|
BUFFER_POOL.putBuffer(parityBuf);
|
||||||
|
parityBuf = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -627,4 +627,41 @@ private void emptyBufferPoolForCurrentPolicy(ElasticByteBufferPool ebbp,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUnbuffer() throws Exception {
|
||||||
|
final int numBlocks = 2;
|
||||||
|
final int fileSize = numBlocks * blockGroupSize;
|
||||||
|
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
|
||||||
|
stripesPerBlock, false, ecPolicy);
|
||||||
|
LocatedBlocks lbs = fs.getClient().namenode.
|
||||||
|
getBlockLocations(filePath.toString(), 0, fileSize);
|
||||||
|
|
||||||
|
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
|
||||||
|
assert lb instanceof LocatedStripedBlock;
|
||||||
|
LocatedStripedBlock bg = (LocatedStripedBlock)(lb);
|
||||||
|
for (int i = 0; i < dataBlocks; i++) {
|
||||||
|
Block blk = new Block(bg.getBlock().getBlockId() + i,
|
||||||
|
stripesPerBlock * cellSize,
|
||||||
|
bg.getBlock().getGenerationStamp());
|
||||||
|
blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
|
||||||
|
cluster.injectBlocks(i, Arrays.asList(blk),
|
||||||
|
bg.getBlock().getBlockPoolId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(),
|
||||||
|
filePath.toString(), false, ecPolicy, null);
|
||||||
|
ByteBuffer readBuffer = ByteBuffer.allocate(fileSize);
|
||||||
|
int done = 0;
|
||||||
|
while (done < fileSize) {
|
||||||
|
int ret = in.read(readBuffer);
|
||||||
|
assertTrue(ret > 0);
|
||||||
|
done += ret;
|
||||||
|
}
|
||||||
|
in.unbuffer();
|
||||||
|
ByteBuffer curStripeBuf = (in.getCurStripeBuf());
|
||||||
|
assertNull(curStripeBuf);
|
||||||
|
assertNull(in.parityBuf);
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user