HDFS-16633. Fixing when Reserved Space For Replicas is not released on some cases (#4452)
* HDFS-16633.Reserved Space For Replicas is not released on some cases Co-authored-by: Ashutosh Gupta <ashugpt@amazon.com> (cherry picked from commit b7edc6c60ca9e17c4f6a7d07aad5efbd7c2e3b40) Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java
This commit is contained in:
parent
1cc8cb68f2
commit
fa2a0a603a
@ -307,6 +307,17 @@ class BlockReceiver implements Closeable {
|
|||||||
return replicaInfo;
|
return replicaInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void releaseAnyRemainingReservedSpace() {
|
||||||
|
if (replicaInfo != null) {
|
||||||
|
if (replicaInfo.getReplicaInfo().getBytesReserved() > 0) {
|
||||||
|
LOG.warn("Block {} has not released the reserved bytes. "
|
||||||
|
+ "Releasing {} bytes as part of close.", replicaInfo.getBlockId(),
|
||||||
|
replicaInfo.getReplicaInfo().getBytesReserved());
|
||||||
|
replicaInfo.releaseAllBytesReserved();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* close files and release volume reference.
|
* close files and release volume reference.
|
||||||
*/
|
*/
|
||||||
|
@ -947,6 +947,9 @@ class DataXceiver extends Receiver implements Runnable {
|
|||||||
IOUtils.closeStream(mirrorIn);
|
IOUtils.closeStream(mirrorIn);
|
||||||
IOUtils.closeStream(replyOut);
|
IOUtils.closeStream(replyOut);
|
||||||
IOUtils.closeSocket(mirrorSock);
|
IOUtils.closeSocket(mirrorSock);
|
||||||
|
if (blockReceiver != null) {
|
||||||
|
blockReceiver.releaseAnyRemainingReservedSpace();
|
||||||
|
}
|
||||||
IOUtils.closeStream(blockReceiver);
|
IOUtils.closeStream(blockReceiver);
|
||||||
setCurrentBlockReceiver(null);
|
setCurrentBlockReceiver(null);
|
||||||
}
|
}
|
||||||
|
@ -174,6 +174,10 @@ public class LocalReplicaInPipeline extends LocalReplica
|
|||||||
getVolume().releaseLockedMemory(bytesReserved);
|
getVolume().releaseLockedMemory(bytesReserved);
|
||||||
bytesReserved = 0;
|
bytesReserved = 0;
|
||||||
}
|
}
|
||||||
|
@Override
|
||||||
|
public void releaseReplicaInfoBytesReserved() {
|
||||||
|
bytesReserved = 0;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setLastChecksumAndDataLen(long dataLength, byte[] checksum) {
|
public void setLastChecksumAndDataLen(long dataLength, byte[] checksum) {
|
||||||
|
@ -51,6 +51,11 @@ public interface ReplicaInPipeline extends Replica {
|
|||||||
*/
|
*/
|
||||||
public void releaseAllBytesReserved();
|
public void releaseAllBytesReserved();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Release the reserved space from the ReplicaInfo.
|
||||||
|
*/
|
||||||
|
void releaseReplicaInfoBytesReserved();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* store the checksum for the last chunk along with the data length
|
* store the checksum for the last chunk along with the data length
|
||||||
* @param dataLength number of bytes on disk
|
* @param dataLength number of bytes on disk
|
||||||
|
@ -1826,6 +1826,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||||||
|
|
||||||
newReplicaInfo = v.addFinalizedBlock(
|
newReplicaInfo = v.addFinalizedBlock(
|
||||||
bpid, replicaInfo, replicaInfo, replicaInfo.getBytesReserved());
|
bpid, replicaInfo, replicaInfo, replicaInfo.getBytesReserved());
|
||||||
|
if (replicaInfo instanceof ReplicaInPipeline) {
|
||||||
|
((ReplicaInPipeline) replicaInfo).releaseReplicaInfoBytesReserved();
|
||||||
|
}
|
||||||
if (v.isTransientStorage()) {
|
if (v.isTransientStorage()) {
|
||||||
releaseLockedMemory(
|
releaseLockedMemory(
|
||||||
replicaInfo.getOriginalBytesReserved()
|
replicaInfo.getOriginalBytesReserved()
|
||||||
|
@ -354,6 +354,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||||||
public void releaseAllBytesReserved() {
|
public void releaseAllBytesReserved() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void releaseReplicaInfoBytesReserved() {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
synchronized public long getBytesOnDisk() {
|
synchronized public long getBytesOnDisk() {
|
||||||
if (finalized) {
|
if (finalized) {
|
||||||
|
@ -44,6 +44,10 @@ public class ExternalReplicaInPipeline implements ReplicaInPipeline {
|
|||||||
public void setBytesAcked(long bytesAcked) {
|
public void setBytesAcked(long bytesAcked) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void releaseReplicaInfoBytesReserved() {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void releaseAllBytesReserved() {
|
public void releaseAllBytesReserved() {
|
||||||
}
|
}
|
||||||
|
@ -32,14 +32,18 @@ import static org.junit.Assert.fail;
|
|||||||
|
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.HdfsBlockLocation;
|
import org.apache.hadoop.fs.HdfsBlockLocation;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.*;
|
import org.apache.hadoop.hdfs.*;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
@ -57,6 +61,8 @@ import java.io.IOException;
|
|||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.lang.management.ManagementFactory;
|
import java.lang.management.ManagementFactory;
|
||||||
import java.lang.reflect.Field;
|
import java.lang.reflect.Field;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.EnumSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
@ -745,4 +751,48 @@ public class TestSpaceReservation {
|
|||||||
}, 500, 30000);
|
}, 500, 30000);
|
||||||
checkReservedSpace(0);
|
checkReservedSpace(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ensure that bytes reserved of ReplicaInfo gets cleared
|
||||||
|
* during finalize.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testReplicaInfoBytesReservedReleasedOnFinalize() throws IOException {
|
||||||
|
short replication = 3;
|
||||||
|
int bufferLength = 4096;
|
||||||
|
startCluster(BLOCK_SIZE, replication, -1);
|
||||||
|
|
||||||
|
String methodName = GenericTestUtils.getMethodName();
|
||||||
|
Path path = new Path("/" + methodName + ".01.dat");
|
||||||
|
|
||||||
|
FSDataOutputStream fos =
|
||||||
|
fs.create(path, FsPermission.getFileDefault(), EnumSet.of(CreateFlag.CREATE), bufferLength,
|
||||||
|
replication, BLOCK_SIZE, null);
|
||||||
|
// Allocate a block.
|
||||||
|
fos.write(new byte[bufferLength]);
|
||||||
|
fos.hsync();
|
||||||
|
|
||||||
|
DataNode dataNode = cluster.getDataNodes().get(0);
|
||||||
|
FsDatasetImpl fsDataSetImpl = (FsDatasetImpl) dataNode.getFSDataset();
|
||||||
|
long expectedReservedSpace = BLOCK_SIZE - bufferLength;
|
||||||
|
|
||||||
|
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
|
Collection<ReplicaInfo> replicas = FsDatasetTestUtil.getReplicas(fsDataSetImpl, bpid);
|
||||||
|
ReplicaInfo r = replicas.iterator().next();
|
||||||
|
|
||||||
|
// Verify Initial Bytes Reserved for Replica and Volume are correct
|
||||||
|
assertEquals(fsDataSetImpl.getVolumeList().get(0).getReservedForReplicas(),
|
||||||
|
expectedReservedSpace);
|
||||||
|
assertEquals(r.getBytesReserved(), expectedReservedSpace);
|
||||||
|
|
||||||
|
// Verify Bytes Reserved for Replica and Volume are correct after finalize
|
||||||
|
fsDataSetImpl.finalizeNewReplica(r, new ExtendedBlock(bpid, r));
|
||||||
|
|
||||||
|
assertEquals(fsDataSetImpl.getVolumeList().get(0).getReservedForReplicas(), 0L);
|
||||||
|
assertEquals(r.getBytesReserved(), 0L);
|
||||||
|
|
||||||
|
fos.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user