diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 922f74eaec..07e660934d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -177,14 +177,18 @@ public class DFSStripedInputStream extends DFSInputStream { @Override public synchronized void close() throws IOException { - super.close(); - if (curStripeBuf != null) { - BUFFER_POOL.putBuffer(curStripeBuf); - curStripeBuf = null; - } - if (parityBuf != null) { - BUFFER_POOL.putBuffer(parityBuf); - parityBuf = null; + try { + super.close(); + } finally { + if (curStripeBuf != null) { + BUFFER_POOL.putBuffer(curStripeBuf); + curStripeBuf = null; + } + if (parityBuf != null) { + BUFFER_POOL.putBuffer(parityBuf); + parityBuf = null; + } + decoder.release(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 52fc5ebf71..22b30e9306 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -1033,6 +1033,7 @@ public class DFSStripedOutputStream extends DFSOutputStream { setClosed(); // shutdown executor of flushAll tasks flushAllExecutor.shutdownNow(); + encoder.release(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java index 9555618990..69173173fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockChecksumReconstructor.java @@ -75,29 +75,33 @@ public class StripedBlockChecksumReconstructor extends StripedReconstructor { public void reconstruct() throws IOException { MessageDigest digester = MD5Hash.getDigester(); long maxTargetLength = getMaxTargetLength(); - while (requestedLen > 0 && getPositionInBlock() < maxTargetLength) { - long remaining = maxTargetLength - getPositionInBlock(); - final int toReconstructLen = (int) Math - .min(getStripedReader().getBufferSize(), remaining); - // step1: read from minimum source DNs required for reconstruction. - // The returned success list is the source DNs we do real read from - getStripedReader().readMinimumSources(toReconstructLen); + try { + while (requestedLen > 0 && getPositionInBlock() < maxTargetLength) { + long remaining = maxTargetLength - getPositionInBlock(); + final int toReconstructLen = (int) Math + .min(getStripedReader().getBufferSize(), remaining); + // step1: read from minimum source DNs required for reconstruction. + // The returned success list is the source DNs we do real read from + getStripedReader().readMinimumSources(toReconstructLen); - // step2: decode to reconstruct targets - reconstructTargets(toReconstructLen); + // step2: decode to reconstruct targets + reconstructTargets(toReconstructLen); - // step3: calculate checksum - checksumDataLen += checksumWithTargetOutput(targetBuffer.array(), - toReconstructLen, digester); + // step3: calculate checksum + checksumDataLen += checksumWithTargetOutput(targetBuffer.array(), + toReconstructLen, digester); - updatePositionInBlock(toReconstructLen); - requestedLen -= toReconstructLen; - clearBuffers(); + updatePositionInBlock(toReconstructLen); + requestedLen -= toReconstructLen; + clearBuffers(); + } + + byte[] digest = digester.digest(); + md5 = new MD5Hash(digest); + md5.write(checksumWriter); + } finally { + cleanup(); } - - byte[] digest = digester.digest(); - md5 = new MD5Hash(digest); - md5.write(checksumWriter); } private long checksumWithTargetOutput(byte[] outputData, int toReconstructLen, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java index a1da536ba5..1119bbbd23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java @@ -74,6 +74,7 @@ class StripedBlockReconstructor extends StripedReconstructor metrics.incrECReconstructionBytesWritten(getBytesWritten()); getStripedReader().close(); stripedWriter.close(); + cleanup(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java index cd17864c94..b8433c7b6c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java @@ -253,6 +253,12 @@ abstract class StripedReconstructor { return decoder; } + void cleanup() { + if (decoder != null) { + decoder.release(); + } + } + StripedReader getStripedReader() { return stripedReader; }