diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index e20f4371d1..edc22e8bbc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -307,6 +307,17 @@ Replica getReplica() { return replicaInfo; } + public void releaseAnyRemainingReservedSpace() { + if (replicaInfo != null) { + if (replicaInfo.getReplicaInfo().getBytesReserved() > 0) { + LOG.warn("Block {} has not released the reserved bytes. " + + "Releasing {} bytes as part of close.", replicaInfo.getBlockId(), + replicaInfo.getReplicaInfo().getBytesReserved()); + replicaInfo.releaseAllBytesReserved(); + } + } + } + /** * close files and release volume reference. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index a511e81951..300d84157b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -947,6 +947,9 @@ public void writeBlock(final ExtendedBlock block, IOUtils.closeStream(mirrorIn); IOUtils.closeStream(replyOut); IOUtils.closeSocket(mirrorSock); + if (blockReceiver != null) { + blockReceiver.releaseAnyRemainingReservedSpace(); + } IOUtils.closeStream(blockReceiver); setCurrentBlockReceiver(null); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java index 99d2fc8e04..24b6bd550e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java @@ -174,6 +174,10 @@ public void releaseAllBytesReserved() { getVolume().releaseLockedMemory(bytesReserved); bytesReserved = 0; } + @Override + public void releaseReplicaInfoBytesReserved() { + bytesReserved = 0; + } @Override public void setLastChecksumAndDataLen(long dataLength, byte[] checksum) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java index 174827b5a2..65da42d3a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java @@ -51,6 +51,11 @@ public interface ReplicaInPipeline extends Replica { */ public void releaseAllBytesReserved(); + /** + * Release the reserved space from the ReplicaInfo. + */ + void releaseReplicaInfoBytesReserved(); + /** * store the checksum for the last chunk along with the data length * @param dataLength number of bytes on disk diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index a54d4ef1b9..054c2c347d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1826,6 +1826,9 @@ private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo) newReplicaInfo = v.addFinalizedBlock( bpid, replicaInfo, replicaInfo, replicaInfo.getBytesReserved()); + if (replicaInfo instanceof ReplicaInPipeline) { + ((ReplicaInPipeline) replicaInfo).releaseReplicaInfoBytesReserved(); + } if (v.isTransientStorage()) { releaseLockedMemory( replicaInfo.getOriginalBytesReserved() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 6740343e83..ffb6026a97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -354,6 +354,10 @@ synchronized public void setBytesAcked(long bytesAcked) { public void releaseAllBytesReserved() { } + @Override + public void releaseReplicaInfoBytesReserved() { + } + @Override synchronized public long getBytesOnDisk() { if (finalized) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java index b135411203..c9979d530b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java @@ -44,6 +44,10 @@ public long getBytesAcked() { public void setBytesAcked(long bytesAcked) { } + @Override + public void releaseReplicaInfoBytesReserved() { + } + @Override public void releaseAllBytesReserved() { } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java index 9d5bfd7b2e..70c00d7b31 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java @@ -32,14 +32,18 @@ import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.HdfsBlockLocation; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.*; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; +import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.ipc.RemoteException; @@ -57,6 +61,8 @@ import java.io.OutputStream; import java.lang.management.ManagementFactory; import java.lang.reflect.Field; +import java.util.Collection; +import java.util.EnumSet; import java.util.Map; import java.util.Random; import java.util.concurrent.TimeoutException; @@ -745,4 +751,48 @@ public Boolean get() { }, 500, 30000); checkReservedSpace(0); } + + /** + * Ensure that bytes reserved of ReplicaInfo gets cleared + * during finalize. + * + * @throws IOException + */ + @Test(timeout = 300000) + public void testReplicaInfoBytesReservedReleasedOnFinalize() throws IOException { + short replication = 3; + int bufferLength = 4096; + startCluster(BLOCK_SIZE, replication, -1); + + String methodName = GenericTestUtils.getMethodName(); + Path path = new Path("/" + methodName + ".01.dat"); + + FSDataOutputStream fos = + fs.create(path, FsPermission.getFileDefault(), EnumSet.of(CreateFlag.CREATE), bufferLength, + replication, BLOCK_SIZE, null); + // Allocate a block. + fos.write(new byte[bufferLength]); + fos.hsync(); + + DataNode dataNode = cluster.getDataNodes().get(0); + FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset(); + long expectedReservedSpace = BLOCK_SIZE - bufferLength; + + String bpid = cluster.getNamesystem().getBlockPoolId(); + Collection replicas = FsDatasetTestUtil.getReplicas(fsDataSetImpl, bpid); + ReplicaInfo r = replicas.iterator().next(); + + // Verify Initial Bytes Reserved for Replica and Volume are correct + assertEquals(fsDataSetImpl.getVolumeList().get(0).getReservedForReplicas(), + expectedReservedSpace); + assertEquals(r.getBytesReserved(), expectedReservedSpace); + + // Verify Bytes Reserved for Replica and Volume are correct after finalize + fsDataSetImpl.finalizeNewReplica(r, new ExtendedBlock(bpid, r)); + + assertEquals(fsDataSetImpl.getVolumeList().get(0).getReservedForReplicas(), 0L); + assertEquals(r.getBytesReserved(), 0L); + + fos.close(); + } }