From 54f7a6b127908cebedf44f4a96ee06e12e98f0d6 Mon Sep 17 00:00:00 2001 From: hfutatzhanghb Date: Mon, 22 Jan 2024 11:50:51 +0800 Subject: [PATCH] HDFS-17293. First packet data + checksum size will be set to 516 bytes when writing to a new block. (#6368). Contributed by farmmamba. Reviewed-by: He Xiaoqiao Signed-off-by: Shuyan Zhang --- .../apache/hadoop/hdfs/DFSOutputStream.java | 9 +++- .../hadoop/hdfs/TestDFSOutputStream.java | 41 +++++++++++++++++++ 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index b6634eddc8..a1bfb7f5d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -536,8 +536,13 @@ protected void adjustChunkBoundary() { } if (!getStreamer().getAppendChunk()) { - final int psize = (int) Math - .min(blockSize - getStreamer().getBytesCurBlock(), writePacketSize); + int psize = 0; + if (blockSize == getStreamer().getBytesCurBlock()) { + psize = writePacketSize; + } else { + psize = (int) Math + .min(blockSize - getStreamer().getBytesCurBlock(), writePacketSize); + } computePacketChunkSize(psize, bytesPerChecksum); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index 0f1b965cc2..bdb91f91bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; @@ -58,6 +59,7 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.test.Whitebox; +import org.apache.hadoop.util.DataChecksum; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -508,6 +510,45 @@ public void testExceptionInCloseWithoutRecoverLease() throws Exception { } } + @Test(timeout=60000) + public void testFirstPacketSizeInNewBlocks() throws IOException { + final long blockSize = (long) 1024 * 1024; + MiniDFSCluster dfsCluster = cluster; + DistributedFileSystem fs = dfsCluster.getFileSystem(); + Configuration dfsConf = fs.getConf(); + + EnumSet flags = EnumSet.of(CreateFlag.CREATE); + try(FSDataOutputStream fos = fs.create(new Path("/testfile.dat"), + FsPermission.getDefault(), + flags, 512, (short)3, blockSize, null)) { + + DataChecksum crc32c = DataChecksum.newDataChecksum( + DataChecksum.Type.CRC32C, 512); + + long loop = 0; + Random r = new Random(); + byte[] buf = new byte[(int) blockSize]; + r.nextBytes(buf); + fos.write(buf); + fos.hflush(); + + int chunkSize = crc32c.getBytesPerChecksum() + crc32c.getChecksumSize(); + int packetContentSize = (dfsConf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, + DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT) - + PacketHeader.PKT_MAX_HEADER_LEN) / chunkSize * chunkSize; + + while (loop < 20) { + r.nextBytes(buf); + fos.write(buf); + fos.hflush(); + loop++; + Assert.assertEquals(((DFSOutputStream) fos.getWrappedStream()).packetSize, + packetContentSize); + } + } + fs.delete(new Path("/testfile.dat"), true); + } + @AfterClass public static void tearDown() { if (cluster != null) {