From ded304e6a6e74742f6f4a35989f762dcefa234cb Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Tue, 16 Oct 2012 00:55:29 +0000 Subject: [PATCH] HDFS-4049. Fix hflush performance regression due to nagling delays. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1398591 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../protocol/datatransfer/PacketReceiver.java | 84 ++++++++----- .../hadoop/hdfs/TestMultiThreadedHflush.java | 113 +++++++++++------- .../datatransfer/TestPacketReceiver.java | 106 ++++++++++++++++ 4 files changed, 233 insertions(+), 73 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/TestPacketReceiver.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index a56f86516a..d7591b2ec8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -440,6 +440,9 @@ Release 2.0.3-alpha - Unreleased HDFS-4044. Duplicate ChecksumType definition in HDFS .proto files. (Binglin Chang via suresh) + HDFS-4049. Fix hflush performance regression due to nagling delays + (todd) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java index 6f84fe0202..cc2d17974a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java @@ -53,14 +53,8 @@ public class PacketReceiver implements Closeable { private final boolean useDirectBuffers; /** - * Internal buffer for reading the length prefixes at the start of - * the packet. - */ - private final ByteBuffer lengthPrefixBuf = ByteBuffer.allocate( - PacketHeader.PKT_LENGTHS_LEN); - - /** - * The entirety of the most recently read packet, excepting the + * The entirety of the most recently read packet. + * The first PKT_LENGTHS_LEN bytes of this buffer are the * length prefixes. */ private ByteBuffer curPacketBuf = null; @@ -82,6 +76,7 @@ public class PacketReceiver implements Closeable { public PacketReceiver(boolean useDirectBuffers) { this.useDirectBuffers = useDirectBuffers; + reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN); } public PacketHeader getHeader() { @@ -133,11 +128,12 @@ private void doRead(ReadableByteChannel ch, InputStream in) // checksums were not requested // DATA the actual block data Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock()); - - lengthPrefixBuf.clear(); - doReadFully(ch, in, lengthPrefixBuf); - lengthPrefixBuf.flip(); - int payloadLen = lengthPrefixBuf.getInt(); + + curPacketBuf.clear(); + curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN); + doReadFully(ch, in, curPacketBuf); + curPacketBuf.flip(); + int payloadLen = curPacketBuf.getInt(); if (payloadLen < Ints.BYTES) { // The "payload length" includes its own length. Therefore it @@ -146,7 +142,7 @@ private void doRead(ReadableByteChannel ch, InputStream in) payloadLen); } int dataPlusChecksumLen = payloadLen - Ints.BYTES; - int headerLen = lengthPrefixBuf.getShort(); + int headerLen = curPacketBuf.getShort(); if (headerLen < 0) { throw new IOException("Invalid header length " + headerLen); } @@ -166,13 +162,17 @@ private void doRead(ReadableByteChannel ch, InputStream in) // Make sure we have space for the whole packet, and // read it. - reallocPacketBuf(dataPlusChecksumLen + headerLen); + reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN + + dataPlusChecksumLen + headerLen); curPacketBuf.clear(); - curPacketBuf.limit(dataPlusChecksumLen + headerLen); + curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN); + curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN + + dataPlusChecksumLen + headerLen); doReadFully(ch, in, curPacketBuf); curPacketBuf.flip(); + curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN); - // Extract the header from the front of the buffer. + // Extract the header from the front of the buffer (after the length prefixes) byte[] headerBuf = new byte[headerLen]; curPacketBuf.get(headerBuf); if (curHeader == null) { @@ -197,10 +197,6 @@ private void doRead(ReadableByteChannel ch, InputStream in) public void mirrorPacketTo(DataOutputStream mirrorOut) throws IOException { Preconditions.checkState(!useDirectBuffers, "Currently only supported for non-direct buffers"); - assert lengthPrefixBuf.capacity() == PacketHeader.PKT_LENGTHS_LEN; - mirrorOut.write(lengthPrefixBuf.array(), - lengthPrefixBuf.arrayOffset(), - lengthPrefixBuf.capacity()); mirrorOut.write(curPacketBuf.array(), curPacketBuf.arrayOffset(), curPacketBuf.remaining()); @@ -223,23 +219,36 @@ private static void doReadFully(ReadableByteChannel ch, InputStream in, private void reslicePacket( int headerLen, int checksumsLen, int dataLen) { + // Packet structure (refer to doRead() for details): + // PLEN HLEN HEADER CHECKSUMS DATA + // 32-bit 16-bit + // |--- lenThroughHeader ----| + // |----------- lenThroughChecksums ----| + // |------------------- lenThroughData ------| + int lenThroughHeader = PacketHeader.PKT_LENGTHS_LEN + headerLen; + int lenThroughChecksums = lenThroughHeader + checksumsLen; + int lenThroughData = lenThroughChecksums + dataLen; + assert dataLen >= 0 : "invalid datalen: " + dataLen; - - assert curPacketBuf.position() == headerLen; - assert checksumsLen + dataLen == curPacketBuf.remaining() : + assert curPacketBuf.position() == lenThroughHeader; + assert curPacketBuf.limit() == lenThroughData : "headerLen= " + headerLen + " clen=" + checksumsLen + " dlen=" + dataLen + " rem=" + curPacketBuf.remaining(); - - curPacketBuf.position(headerLen); - curPacketBuf.limit(headerLen + checksumsLen); + + // Slice the checksums. + curPacketBuf.position(lenThroughHeader); + curPacketBuf.limit(lenThroughChecksums); curChecksumSlice = curPacketBuf.slice(); - curPacketBuf.position(headerLen + checksumsLen); - curPacketBuf.limit(headerLen + checksumsLen + dataLen); + // Slice the data. + curPacketBuf.position(lenThroughChecksums); + curPacketBuf.limit(lenThroughData); curDataSlice = curPacketBuf.slice(); + // Reset buffer to point to the entirety of the packet (including + // length prefixes) curPacketBuf.position(0); - curPacketBuf.limit(headerLen + checksumsLen + dataLen); + curPacketBuf.limit(lenThroughData); } @@ -258,12 +267,21 @@ private void reallocPacketBuf(int atLeastCapacity) { // one. if (curPacketBuf == null || curPacketBuf.capacity() < atLeastCapacity) { - returnPacketBufToPool(); + ByteBuffer newBuf; if (useDirectBuffers) { - curPacketBuf = bufferPool.getBuffer(atLeastCapacity); + newBuf = bufferPool.getBuffer(atLeastCapacity); } else { - curPacketBuf = ByteBuffer.allocate(atLeastCapacity); + newBuf = ByteBuffer.allocate(atLeastCapacity); } + // If reallocing an existing buffer, copy the old packet length + // prefixes over + if (curPacketBuf != null) { + curPacketBuf.flip(); + newBuf.put(curPacketBuf); + } + + returnPacketBufToPool(); + curPacketBuf = newBuf; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java index 8d320903a8..7a44bd9829 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java @@ -20,45 +20,40 @@ import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; -import org.apache.hadoop.hdfs.server.namenode.LeaseManager; -import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; -import org.apache.log4j.Level; +import org.apache.hadoop.metrics2.util.Quantile; +import org.apache.hadoop.metrics2.util.SampleQuantiles; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; import org.junit.Test; +import com.google.common.base.Stopwatch; + /** * This class tests hflushing concurrently from many threads. */ public class TestMultiThreadedHflush { static final int blockSize = 1024*1024; - static final int numBlocks = 10; - static final int fileSize = numBlocks * blockSize + 1; private static final int NUM_THREADS = 10; private static final int WRITE_SIZE = 517; private static final int NUM_WRITES_PER_THREAD = 1000; private byte[] toWrite = null; - - { - ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL); - ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); - ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL); - ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); - ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); - ((Log4JLogger)InterDatanodeProtocol.LOG).getLogger().setLevel(Level.ALL); - } + + private final SampleQuantiles quantiles = new SampleQuantiles( + new Quantile[] { + new Quantile(0.50, 0.050), + new Quantile(0.75, 0.025), new Quantile(0.90, 0.010), + new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) }); /* * creates a file but does not close it @@ -104,8 +99,11 @@ public void run() { } private void doAWrite() throws IOException { + Stopwatch sw = new Stopwatch().start(); stm.write(toWrite); stm.hflush(); + long micros = sw.elapsedTime(TimeUnit.MICROSECONDS); + quantiles.insert(micros); } } @@ -115,14 +113,28 @@ private void doAWrite() throws IOException { * They all finish before the file is closed. */ @Test - public void testMultipleHflushers() throws Exception { + public void testMultipleHflushersRepl1() throws Exception { + doTestMultipleHflushers(1); + } + + @Test + public void testMultipleHflushersRepl3() throws Exception { + doTestMultipleHflushers(3); + } + + private void doTestMultipleHflushers(int repl) throws Exception { Configuration conf = new Configuration(); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(repl) + .build(); FileSystem fs = cluster.getFileSystem(); Path p = new Path("/multiple-hflushers.dat"); try { - doMultithreadedWrites(conf, p, NUM_THREADS, WRITE_SIZE, NUM_WRITES_PER_THREAD); + doMultithreadedWrites(conf, p, NUM_THREADS, WRITE_SIZE, + NUM_WRITES_PER_THREAD, repl); + System.out.println("Latency quantiles (in microseconds):\n" + + quantiles); } finally { fs.close(); cluster.shutdown(); @@ -200,13 +212,13 @@ public void run() { } public void doMultithreadedWrites( - Configuration conf, Path p, int numThreads, int bufferSize, int numWrites) - throws Exception { + Configuration conf, Path p, int numThreads, int bufferSize, int numWrites, + int replication) throws Exception { initBuffer(bufferSize); // create a new file. FileSystem fs = p.getFileSystem(conf); - FSDataOutputStream stm = createFile(fs, p, 1); + FSDataOutputStream stm = createFile(fs, p, replication); System.out.println("Created file simpleFlush.dat"); // There have been a couple issues with flushing empty buffers, so do @@ -240,20 +252,41 @@ public void doMultithreadedWrites( } public static void main(String args[]) throws Exception { - if (args.length != 1) { - System.err.println( - "usage: " + TestMultiThreadedHflush.class.getSimpleName() + - " "); - System.exit(1); - } - TestMultiThreadedHflush test = new TestMultiThreadedHflush(); - Configuration conf = new Configuration(); - Path p = new Path(args[0]); - long st = System.nanoTime(); - test.doMultithreadedWrites(conf, p, 10, 511, 50000); - long et = System.nanoTime(); - - System.out.println("Finished in " + ((et - st) / 1000000) + "ms"); + System.exit(ToolRunner.run(new CLIBenchmark(), args)); + } + + private static class CLIBenchmark extends Configured implements Tool { + public int run(String args[]) throws Exception { + if (args.length != 1) { + System.err.println( + "usage: " + TestMultiThreadedHflush.class.getSimpleName() + + " "); + System.err.println( + "Configurations settable by -D options:\n" + + " num.threads [default 10] - how many threads to run\n" + + " write.size [default 511] - bytes per write\n" + + " num.writes [default 50000] - how many writes to perform"); + System.exit(1); + } + TestMultiThreadedHflush test = new TestMultiThreadedHflush(); + Configuration conf = getConf(); + Path p = new Path(args[0]); + + int numThreads = conf.getInt("num.threads", 10); + int writeSize = conf.getInt("write.size", 511); + int numWrites = conf.getInt("num.writes", 50000); + int replication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, + DFSConfigKeys.DFS_REPLICATION_DEFAULT); + + Stopwatch sw = new Stopwatch().start(); + test.doMultithreadedWrites(conf, p, numThreads, writeSize, numWrites, + replication); + sw.stop(); + + System.out.println("Finished in " + sw.elapsedMillis() + "ms"); + System.out.println("Latency quantiles (in microseconds):\n" + + test.quantiles); + return 0; + } } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/TestPacketReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/TestPacketReceiver.java new file mode 100644 index 0000000000..428a0384b7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/TestPacketReceiver.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hdfs.AppendTestUtil; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.*; + +public class TestPacketReceiver { + + private static long OFFSET_IN_BLOCK = 12345L; + private static int SEQNO = 54321; + + private byte[] prepareFakePacket(byte[] data, byte[] sums) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + + int packetLen = data.length + sums.length + 4; + PacketHeader header = new PacketHeader( + packetLen, OFFSET_IN_BLOCK, SEQNO, false, data.length, false); + header.write(dos); + + dos.write(sums); + dos.write(data); + dos.flush(); + return baos.toByteArray(); + } + + private static byte[] remainingAsArray(ByteBuffer buf) { + byte[] b = new byte[buf.remaining()]; + buf.get(b); + return b; + } + + @Test + public void testReceiveAndMirror() throws IOException { + PacketReceiver pr = new PacketReceiver(false); + + // Test three different lengths, to force reallocing + // the buffer as it grows. + doTestReceiveAndMirror(pr, 100, 10); + doTestReceiveAndMirror(pr, 50, 10); + doTestReceiveAndMirror(pr, 150, 10); + + pr.close(); + } + + private void doTestReceiveAndMirror(PacketReceiver pr, + int dataLen, int checksumsLen) throws IOException { + final byte[] DATA = AppendTestUtil.initBuffer(dataLen); + final byte[] CHECKSUMS = AppendTestUtil.initBuffer(checksumsLen); + + byte[] packet = prepareFakePacket(DATA, CHECKSUMS); + ByteArrayInputStream in = new ByteArrayInputStream(packet); + + pr.receiveNextPacket(in); + + ByteBuffer parsedData = pr.getDataSlice(); + assertArrayEquals(DATA, remainingAsArray(parsedData)); + + ByteBuffer parsedChecksums = pr.getChecksumSlice(); + assertArrayEquals(CHECKSUMS, remainingAsArray(parsedChecksums)); + + PacketHeader header = pr.getHeader(); + assertEquals(SEQNO, header.getSeqno()); + assertEquals(OFFSET_IN_BLOCK, header.getOffsetInBlock()); + + // Mirror the packet to an output stream and make sure it matches + // the packet we sent. + ByteArrayOutputStream mirrored = new ByteArrayOutputStream(); + mirrored = Mockito.spy(mirrored); + + pr.mirrorPacketTo(new DataOutputStream(mirrored)); + // The write should be done in a single call. Otherwise we may hit + // nasty interactions with nagling (eg HDFS-4049). + Mockito.verify(mirrored, Mockito.times(1)) + .write(Mockito.any(), Mockito.anyInt(), + Mockito.eq(packet.length)); + Mockito.verifyNoMoreInteractions(mirrored); + + assertArrayEquals(packet, mirrored.toByteArray()); + } +}