HDFS-16633. Fixing when Reserved Space For Replicas is not released on some cases (#4452)
* HDFS-16633.Reserved Space For Replicas is not released on some cases Co-authored-by: Ashutosh Gupta <ashugpt@amazon.com>
This commit is contained in:
parent
734b6f19ad
commit
b7edc6c60c
@ -307,6 +307,17 @@ Replica getReplica() {
|
||||
return replicaInfo;
|
||||
}
|
||||
|
||||
public void releaseAnyRemainingReservedSpace() {
|
||||
if (replicaInfo != null) {
|
||||
if (replicaInfo.getReplicaInfo().getBytesReserved() > 0) {
|
||||
LOG.warn("Block {} has not released the reserved bytes. "
|
||||
+ "Releasing {} bytes as part of close.", replicaInfo.getBlockId(),
|
||||
replicaInfo.getReplicaInfo().getBytesReserved());
|
||||
replicaInfo.releaseAllBytesReserved();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* close files and release volume reference.
|
||||
*/
|
||||
|
@ -951,6 +951,9 @@ public void writeBlock(final ExtendedBlock block,
|
||||
IOUtils.closeStream(mirrorIn);
|
||||
IOUtils.closeStream(replyOut);
|
||||
IOUtils.closeSocket(mirrorSock);
|
||||
if (blockReceiver != null) {
|
||||
blockReceiver.releaseAnyRemainingReservedSpace();
|
||||
}
|
||||
IOUtils.closeStream(blockReceiver);
|
||||
setCurrentBlockReceiver(null);
|
||||
}
|
||||
|
@ -174,6 +174,10 @@ public void releaseAllBytesReserved() {
|
||||
getVolume().releaseLockedMemory(bytesReserved);
|
||||
bytesReserved = 0;
|
||||
}
|
||||
@Override
|
||||
public void releaseReplicaInfoBytesReserved() {
|
||||
bytesReserved = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setLastChecksumAndDataLen(long dataLength, byte[] checksum) {
|
||||
|
@ -51,6 +51,11 @@ public interface ReplicaInPipeline extends Replica {
|
||||
*/
|
||||
public void releaseAllBytesReserved();
|
||||
|
||||
/**
|
||||
* Release the reserved space from the ReplicaInfo.
|
||||
*/
|
||||
void releaseReplicaInfoBytesReserved();
|
||||
|
||||
/**
|
||||
* store the checksum for the last chunk along with the data length
|
||||
* @param dataLength number of bytes on disk
|
||||
|
@ -2017,6 +2017,9 @@ private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
|
||||
|
||||
newReplicaInfo = v.addFinalizedBlock(
|
||||
bpid, replicaInfo, replicaInfo, replicaInfo.getBytesReserved());
|
||||
if (replicaInfo instanceof ReplicaInPipeline) {
|
||||
((ReplicaInPipeline) replicaInfo).releaseReplicaInfoBytesReserved();
|
||||
}
|
||||
if (v.isTransientStorage()) {
|
||||
releaseLockedMemory(
|
||||
replicaInfo.getOriginalBytesReserved()
|
||||
|
@ -356,6 +356,10 @@ synchronized public void setBytesAcked(long bytesAcked) {
|
||||
public void releaseAllBytesReserved() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void releaseReplicaInfoBytesReserved() {
|
||||
}
|
||||
|
||||
@Override
|
||||
synchronized public long getBytesOnDisk() {
|
||||
if (finalized) {
|
||||
@ -418,7 +422,6 @@ public void waitForMinLength(long minLength, long time, TimeUnit unit)
|
||||
} while (deadLine > System.currentTimeMillis());
|
||||
throw new IOException("Minimum length was not achieved within timeout");
|
||||
}
|
||||
|
||||
@Override
|
||||
public FsVolumeSpi getVolume() {
|
||||
return getStorage(theBlock).getVolume();
|
||||
|
@ -45,6 +45,10 @@ public long getBytesAcked() {
|
||||
public void setBytesAcked(long bytesAcked) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void releaseReplicaInfoBytesReserved() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void releaseAllBytesReserved() {
|
||||
}
|
||||
|
@ -18,8 +18,14 @@
|
||||
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -746,4 +752,48 @@ public Boolean get() {
|
||||
}, 500, 30000);
|
||||
checkReservedSpace(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure that bytes reserved of ReplicaInfo gets cleared
|
||||
* during finalize.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test(timeout = 300000)
|
||||
public void testReplicaInfoBytesReservedReleasedOnFinalize() throws IOException {
|
||||
short replication = 3;
|
||||
int bufferLength = 4096;
|
||||
startCluster(BLOCK_SIZE, replication, -1);
|
||||
|
||||
String methodName = GenericTestUtils.getMethodName();
|
||||
Path path = new Path("/" + methodName + ".01.dat");
|
||||
|
||||
FSDataOutputStream fos =
|
||||
fs.create(path, FsPermission.getFileDefault(), EnumSet.of(CreateFlag.CREATE), bufferLength,
|
||||
replication, BLOCK_SIZE, null);
|
||||
// Allocate a block.
|
||||
fos.write(new byte[bufferLength]);
|
||||
fos.hsync();
|
||||
|
||||
DataNode dataNode = cluster.getDataNodes().get(0);
|
||||
FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
|
||||
long expectedReservedSpace = BLOCK_SIZE - bufferLength;
|
||||
|
||||
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||
Collection<ReplicaInfo> replicas = FsDatasetTestUtil.getReplicas(fsDataSetImpl, bpid);
|
||||
ReplicaInfo r = replicas.iterator().next();
|
||||
|
||||
// Verify Initial Bytes Reserved for Replica and Volume are correct
|
||||
assertEquals(fsDataSetImpl.getVolumeList().get(0).getReservedForReplicas(),
|
||||
expectedReservedSpace);
|
||||
assertEquals(r.getBytesReserved(), expectedReservedSpace);
|
||||
|
||||
// Verify Bytes Reserved for Replica and Volume are correct after finalize
|
||||
fsDataSetImpl.finalizeNewReplica(r, new ExtendedBlock(bpid, r));
|
||||
|
||||
assertEquals(fsDataSetImpl.getVolumeList().get(0).getReservedForReplicas(), 0L);
|
||||
assertEquals(r.getBytesReserved(), 0L);
|
||||
|
||||
fos.close();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user