diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index 3ff5c75466..268007f053 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -175,8 +175,13 @@ class BlockSender implements java.io.Closeable { * See {{@link BlockSender#isLongRead()} */ private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024; - + // The number of bytes per checksum here determines the alignment + // of reads: we always start reading at a checksum chunk boundary, + // even if the checksum type is NULL. So, choosing too big of a value + // would risk sending too much unnecessary data. 512 (1 disk sector) + // is likely to result in minimal extra IO. + private static final long CHUNK_SIZE = 512; /** * Constructor * @@ -250,18 +255,16 @@ class BlockSender implements java.io.Closeable { try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) { replica = getReplica(block, datanode); replicaVisibleLength = replica.getVisibleLength(); - if (replica instanceof FinalizedReplica) { - // Load last checksum in case the replica is being written - // concurrently - final FinalizedReplica frep = (FinalizedReplica) replica; - chunkChecksum = frep.getLastChecksumAndDataLen(); - } } if (replica.getState() == ReplicaState.RBW) { final ReplicaInPipeline rbw = (ReplicaInPipeline) replica; waitForMinLength(rbw, startOffset + length); chunkChecksum = rbw.getLastChecksumAndDataLen(); } + if (replica instanceof FinalizedReplica) { + chunkChecksum = getPartialChunkChecksumForFinalized( + (FinalizedReplica)replica); + } if (replica.getGenerationStamp() < block.getGenerationStamp()) { throw new IOException("Replica gen stamp < block genstamp, block=" @@ -348,12 +351,8 @@ class BlockSender implements java.io.Closeable { } } if (csum == null) { - // The number of bytes per checksum here determines the alignment - // of reads: we always start reading at a checksum chunk boundary, - // even if the checksum type is NULL. So, choosing too big of a value - // would risk sending too much unnecessary data. 512 (1 disk sector) - // is likely to result in minimal extra IO. - csum = DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 512); + csum = DataChecksum.newDataChecksum(DataChecksum.Type.NULL, + (int)CHUNK_SIZE); } /* @@ -427,6 +426,37 @@ class BlockSender implements java.io.Closeable { } } + private ChunkChecksum getPartialChunkChecksumForFinalized( + FinalizedReplica finalized) throws IOException { + // There are a number of places in the code base where a finalized replica + // object is created. If last partial checksum is loaded whenever a + // finalized replica is created, it would increase latency in DataNode + // initialization. Therefore, the last partial chunk checksum is loaded + // lazily. + + // Load last checksum in case the replica is being written concurrently + final long replicaVisibleLength = replica.getVisibleLength(); + if (replicaVisibleLength % CHUNK_SIZE != 0 && + finalized.getLastPartialChunkChecksum() == null) { + // the finalized replica does not have precomputed last partial + // chunk checksum. Recompute now. + try { + finalized.loadLastPartialChunkChecksum(); + return new ChunkChecksum(finalized.getVisibleLength(), + finalized.getLastPartialChunkChecksum()); + } catch (FileNotFoundException e) { + // meta file is lost. Continue anyway to preserve existing behavior. + DataNode.LOG.warn( + "meta file " + finalized.getMetaFile() + " is missing!"); + return null; + } + } else { + // If the checksum is null, BlockSender will use on-disk checksum. + return new ChunkChecksum(finalized.getVisibleLength(), + finalized.getLastPartialChunkChecksum()); + } + } + /** * close opened files. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java index e3e045062d..b6212bebc8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; import org.apache.hadoop.hdfs.protocol.Block; @@ -30,9 +29,9 @@ * This class describes a replica that has been finalized. */ public class FinalizedReplica extends LocalReplica { - + private byte[] lastPartialChunkChecksum; /** - * Constructor + * Constructor. * @param blockId block id * @param len replica length * @param genStamp replica generation stamp @@ -41,9 +40,24 @@ public class FinalizedReplica extends LocalReplica { */ public FinalizedReplica(long blockId, long len, long genStamp, FsVolumeSpi vol, File dir) { - super(blockId, len, genStamp, vol, dir); + this(blockId, len, genStamp, vol, dir, null); } - + + /** + * Constructor. + * @param blockId block id + * @param len replica length + * @param genStamp replica generation stamp + * @param vol volume where replica is located + * @param dir directory path where block and meta files are located + * @param checksum the last partial chunk checksum + */ + public FinalizedReplica(long blockId, long len, long genStamp, + FsVolumeSpi vol, File dir, byte[] checksum) { + super(blockId, len, genStamp, vol, dir); + this.setLastPartialChunkChecksum(checksum); + } + /** * Constructor * @param block a block @@ -51,7 +65,20 @@ public FinalizedReplica(long blockId, long len, long genStamp, * @param dir directory path where block and meta files are located */ public FinalizedReplica(Block block, FsVolumeSpi vol, File dir) { + this(block, vol, dir, null); + } + + /** + * Constructor + * @param block a block + * @param vol volume where replica is located + * @param dir directory path where block and meta files are located + * @param checksum the last partial chunk checksum + */ + public FinalizedReplica(Block block, FsVolumeSpi vol, File dir, + byte[] checksum) { super(block, vol, dir); + this.setLastPartialChunkChecksum(checksum); } /** @@ -60,6 +87,7 @@ public FinalizedReplica(Block block, FsVolumeSpi vol, File dir) { */ public FinalizedReplica(FinalizedReplica from) { super(from); + this.setLastPartialChunkChecksum(from.getLastPartialChunkChecksum()); } @Override // ReplicaInfo @@ -116,30 +144,18 @@ public ReplicaRecoveryInfo createInfo() { " does not support createInfo"); } - /** - * gets the last chunk checksum and the length of the block corresponding - * to that checksum. - * Note, need to be called with the FsDataset lock acquired. May improve to - * lock only the FsVolume in the future. - * @throws IOException - */ - public ChunkChecksum getLastChecksumAndDataLen() throws IOException { - ChunkChecksum chunkChecksum = null; - try { - byte[] lastChecksum = getVolume().loadLastPartialChunkChecksum( - getBlockFile(), getMetaFile()); - if (lastChecksum != null) { - chunkChecksum = - new ChunkChecksum(getVisibleLength(), lastChecksum); - } - } catch (FileNotFoundException e) { - // meta file is lost. Try to continue anyway. - DataNode.LOG.warn("meta file " + getMetaFile() + - " is missing!"); - } catch (IOException ioe) { - DataNode.LOG.warn("Unable to read checksum from meta file " + - getMetaFile(), ioe); - } - return chunkChecksum; + public byte[] getLastPartialChunkChecksum() { + return lastPartialChunkChecksum; + } + + public void setLastPartialChunkChecksum(byte[] checksum) { + lastPartialChunkChecksum = checksum; + } + + public void loadLastPartialChunkChecksum() + throws IOException { + byte[] lastChecksum = getVolume().loadLastPartialChunkChecksum( + getBlockFile(), getMetaFile()); + setLastPartialChunkChecksum(lastChecksum); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java index 2c55e73ad4..d198197d67 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java @@ -46,6 +46,7 @@ public class ReplicaBuilder { private Thread writer; private long recoveryId; private Block block; + private byte[] lastPartialChunkChecksum; private ReplicaInfo fromReplica; @@ -178,6 +179,11 @@ public ReplicaBuilder setPathHandle(PathHandle pathHandle) { return this; } + public ReplicaBuilder setLastPartialChunkChecksum(byte[] checksum) { + this.lastPartialChunkChecksum = checksum; + return this; + } + public LocalReplicaInPipeline buildLocalReplicaInPipeline() throws IllegalArgumentException { LocalReplicaInPipeline info = null; @@ -258,10 +264,11 @@ private LocalReplica buildFinalizedReplica() throws IllegalArgumentException { + "state: " + fromReplica.getState()); } else { if (null != block) { - return new FinalizedReplica(block, volume, directoryUsed); + return new FinalizedReplica(block, volume, directoryUsed, + lastPartialChunkChecksum); } else { return new FinalizedReplica(blockId, length, genStamp, volume, - directoryUsed); + directoryUsed, lastPartialChunkChecksum); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 8e7884d70a..c1412930a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1724,6 +1724,7 @@ private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo) replicaInfo.getOriginalReplica().getState() == ReplicaState.FINALIZED) { newReplicaInfo = replicaInfo.getOriginalReplica(); + ((FinalizedReplica)newReplicaInfo).loadLastPartialChunkChecksum(); } else { FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume(); if (v == null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 319bc0efe2..b8c95a44cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -51,6 +51,8 @@ import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; +import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.protocol.Block; @@ -950,10 +952,22 @@ ReplicaInfo addFinalizedBlock(String bpid, Block b, ReplicaInfo replicaInfo, long bytesReserved) throws IOException { releaseReservedSpace(bytesReserved); File dest = getBlockPoolSlice(bpid).addFinalizedBlock(b, replicaInfo); + byte[] checksum = null; + // copy the last partial checksum if the replica is originally + // in finalized or rbw state. + if (replicaInfo.getState() == ReplicaState.FINALIZED) { + FinalizedReplica finalized = (FinalizedReplica)replicaInfo; + checksum = finalized.getLastPartialChunkChecksum(); + } else if (replicaInfo.getState() == ReplicaState.RBW) { + ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo; + checksum = rbw.getLastChecksumAndDataLen().getChecksum(); + } + return new ReplicaBuilder(ReplicaState.FINALIZED) .setBlock(replicaInfo) .setFsVolume(this) .setDirectoryToUse(dest.getParentFile()) + .setLastPartialChunkChecksum(checksum) .build(); } @@ -1183,12 +1197,11 @@ public ReplicaInPipeline append(String bpid, ReplicaInfo replicaInfo, .setBytesToReserve(bytesReserved) .buildLocalReplicaInPipeline(); + // Only a finalized replica can be appended. + FinalizedReplica finalized = (FinalizedReplica)replicaInfo; // load last checksum and datalen - LocalReplica localReplica = (LocalReplica)replicaInfo; - byte[] lastChunkChecksum = loadLastPartialChunkChecksum( - localReplica.getBlockFile(), localReplica.getMetaFile()); newReplicaInfo.setLastChecksumAndDataLen( - replicaInfo.getNumBytes(), lastChunkChecksum); + finalized.getVisibleLength(), finalized.getLastPartialChunkChecksum()); // rename meta file to rbw directory // rename block file to rbw directory diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index c6948547e2..107decfdea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -2986,6 +2986,29 @@ public static File getBlockFile(File storageDir, ExtendedBlock blk) { blk.getBlockPoolId()), blk.getBlockId()), blk.getBlockName()); } + /** + * Return all block files in given directory (recursive search). + */ + public static List getAllBlockFiles(File storageDir) { + List results = new ArrayList(); + File[] files = storageDir.listFiles(); + if (files == null) { + return null; + } + for (File f : files) { + if (f.getName().startsWith(Block.BLOCK_FILE_PREFIX) && + !f.getName().endsWith(Block.METADATA_EXTENSION)) { + results.add(f); + } else if (f.isDirectory()) { + List subdirResults = getAllBlockFiles(f); + if (subdirResults != null) { + results.addAll(subdirResults); + } + } + } + return results; + } + /** * Get the latest metadata file correpsonding to a block * @param storageDir storage directory diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java index 0b273dfa0a..1f31bdc88e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java @@ -92,7 +92,7 @@ public void testListCorruptFilesCorruptedBlock() throws Exception { File storageDir = cluster.getInstanceStorageDir(0, 1); File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid); assertTrue("data directory does not exist", data_dir.exists()); - List metaFiles = MiniDFSCluster.getAllBlockMetadataFiles(data_dir); + List metaFiles = MiniDFSCluster.getAllBlockFiles(data_dir); assertTrue("Data directory does not contain any blocks or there was an " + "IO error", metaFiles != null && !metaFiles.isEmpty()); File metaFile = metaFiles.get(0); @@ -172,7 +172,7 @@ public void testListCorruptFileBlocksInSafeMode() throws Exception { File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, cluster.getNamesystem().getBlockPoolId()); assertTrue("data directory does not exist", data_dir.exists()); - List metaFiles = MiniDFSCluster.getAllBlockMetadataFiles(data_dir); + List metaFiles = MiniDFSCluster.getAllBlockFiles(data_dir); assertTrue("Data directory does not contain any blocks or there was an " + "IO error", metaFiles != null && !metaFiles.isEmpty()); File metaFile = metaFiles.get(0);