diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 53e2fc62ac..16df7091da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -991,8 +991,7 @@ class FsDatasetImpl implements FsDatasetSpi { replicaInfo, smallBufferSize, conf); // Finalize the copied files - newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo, - false); + newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo); try (AutoCloseableLock lock = datasetLock.acquire()) { // Increment numBlocks here as this block moved without knowing to BPS FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume(); @@ -1295,7 +1294,7 @@ class FsDatasetImpl implements FsDatasetSpi { replicaInfo.bumpReplicaGS(newGS); // finalize the replica if RBW if (replicaInfo.getState() == ReplicaState.RBW) { - finalizeReplica(b.getBlockPoolId(), replicaInfo, false); + finalizeReplica(b.getBlockPoolId(), replicaInfo); } return replicaInfo; } @@ -1625,23 +1624,39 @@ class FsDatasetImpl implements FsDatasetSpi { @Override // FsDatasetSpi public void finalizeBlock(ExtendedBlock b, boolean fsyncDir) throws IOException { + ReplicaInfo replicaInfo = null; + ReplicaInfo finalizedReplicaInfo = null; try (AutoCloseableLock lock = datasetLock.acquire()) { if (Thread.interrupted()) { // Don't allow data modifications from interrupted threads throw new IOException("Cannot finalize block from Interrupted Thread"); } - ReplicaInfo replicaInfo = getReplicaInfo(b); + replicaInfo = getReplicaInfo(b); if (replicaInfo.getState() == ReplicaState.FINALIZED) { // this is legal, when recovery happens on a file that has // been opened for append but never modified return; } - finalizeReplica(b.getBlockPoolId(), replicaInfo, fsyncDir); + finalizedReplicaInfo = finalizeReplica(b.getBlockPoolId(), replicaInfo); + } + /* + * Sync the directory after rename from tmp/rbw to Finalized if + * configured. Though rename should be atomic operation, sync on both + * dest and src directories are done because IOUtils.fsync() calls + * directory's channel sync, not the journal itself. + */ + if (fsyncDir && finalizedReplicaInfo instanceof FinalizedReplica + && replicaInfo instanceof LocalReplica) { + FinalizedReplica finalizedReplica = + (FinalizedReplica) finalizedReplicaInfo; + finalizedReplica.fsyncDirectory(); + LocalReplica localReplica = (LocalReplica) replicaInfo; + localReplica.fsyncDirectory(); } } - private ReplicaInfo finalizeReplica(String bpid, - ReplicaInfo replicaInfo, boolean fsyncDir) throws IOException { + private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo) + throws IOException { try (AutoCloseableLock lock = datasetLock.acquire()) { ReplicaInfo newReplicaInfo = null; if (replicaInfo.getState() == ReplicaState.RUR && @@ -1656,19 +1671,6 @@ class FsDatasetImpl implements FsDatasetSpi { newReplicaInfo = v.addFinalizedBlock( bpid, replicaInfo, replicaInfo, replicaInfo.getBytesReserved()); - /* - * Sync the directory after rename from tmp/rbw to Finalized if - * configured. Though rename should be atomic operation, sync on both - * dest and src directories are done because IOUtils.fsync() calls - * directory's channel sync, not the journal itself. - */ - if (fsyncDir && newReplicaInfo instanceof FinalizedReplica - && replicaInfo instanceof LocalReplica) { - FinalizedReplica finalizedReplica = (FinalizedReplica) newReplicaInfo; - finalizedReplica.fsyncDirectory(); - LocalReplica localReplica = (LocalReplica) replicaInfo; - localReplica.fsyncDirectory(); - } if (v.isTransientStorage()) { releaseLockedMemory( replicaInfo.getOriginalBytesReserved() @@ -2634,11 +2636,11 @@ class FsDatasetImpl implements FsDatasetSpi { newReplicaInfo.setNumBytes(newlength); volumeMap.add(bpid, newReplicaInfo.getReplicaInfo()); - finalizeReplica(bpid, newReplicaInfo.getReplicaInfo(), false); + finalizeReplica(bpid, newReplicaInfo.getReplicaInfo()); } } // finalize the block - return finalizeReplica(bpid, rur, false); + return finalizeReplica(bpid, rur); } @Override // FsDatasetSpi