HDFS-4049. Fix hflush performance regression due to nagling delays. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1398591 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-10-16 00:55:29 +00:00
parent 73e9366510
commit ded304e6a6
4 changed files with 233 additions and 73 deletions

View File

@ -440,6 +440,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-4044. Duplicate ChecksumType definition in HDFS .proto files. HDFS-4044. Duplicate ChecksumType definition in HDFS .proto files.
(Binglin Chang via suresh) (Binglin Chang via suresh)
HDFS-4049. Fix hflush performance regression due to nagling delays
(todd)
Release 2.0.2-alpha - 2012-09-07 Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -53,14 +53,8 @@ public class PacketReceiver implements Closeable {
private final boolean useDirectBuffers; private final boolean useDirectBuffers;
/** /**
* Internal buffer for reading the length prefixes at the start of * The entirety of the most recently read packet.
* the packet. * The first PKT_LENGTHS_LEN bytes of this buffer are the
*/
private final ByteBuffer lengthPrefixBuf = ByteBuffer.allocate(
PacketHeader.PKT_LENGTHS_LEN);
/**
* The entirety of the most recently read packet, excepting the
* length prefixes. * length prefixes.
*/ */
private ByteBuffer curPacketBuf = null; private ByteBuffer curPacketBuf = null;
@ -82,6 +76,7 @@ public class PacketReceiver implements Closeable {
public PacketReceiver(boolean useDirectBuffers) { public PacketReceiver(boolean useDirectBuffers) {
this.useDirectBuffers = useDirectBuffers; this.useDirectBuffers = useDirectBuffers;
reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN);
} }
public PacketHeader getHeader() { public PacketHeader getHeader() {
@ -133,11 +128,12 @@ private void doRead(ReadableByteChannel ch, InputStream in)
// checksums were not requested // checksums were not requested
// DATA the actual block data // DATA the actual block data
Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock()); Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
lengthPrefixBuf.clear(); curPacketBuf.clear();
doReadFully(ch, in, lengthPrefixBuf); curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN);
lengthPrefixBuf.flip(); doReadFully(ch, in, curPacketBuf);
int payloadLen = lengthPrefixBuf.getInt(); curPacketBuf.flip();
int payloadLen = curPacketBuf.getInt();
if (payloadLen < Ints.BYTES) { if (payloadLen < Ints.BYTES) {
// The "payload length" includes its own length. Therefore it // The "payload length" includes its own length. Therefore it
@ -146,7 +142,7 @@ private void doRead(ReadableByteChannel ch, InputStream in)
payloadLen); payloadLen);
} }
int dataPlusChecksumLen = payloadLen - Ints.BYTES; int dataPlusChecksumLen = payloadLen - Ints.BYTES;
int headerLen = lengthPrefixBuf.getShort(); int headerLen = curPacketBuf.getShort();
if (headerLen < 0) { if (headerLen < 0) {
throw new IOException("Invalid header length " + headerLen); throw new IOException("Invalid header length " + headerLen);
} }
@ -166,13 +162,17 @@ private void doRead(ReadableByteChannel ch, InputStream in)
// Make sure we have space for the whole packet, and // Make sure we have space for the whole packet, and
// read it. // read it.
reallocPacketBuf(dataPlusChecksumLen + headerLen); reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN +
dataPlusChecksumLen + headerLen);
curPacketBuf.clear(); curPacketBuf.clear();
curPacketBuf.limit(dataPlusChecksumLen + headerLen); curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN);
curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN +
dataPlusChecksumLen + headerLen);
doReadFully(ch, in, curPacketBuf); doReadFully(ch, in, curPacketBuf);
curPacketBuf.flip(); curPacketBuf.flip();
curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN);
// Extract the header from the front of the buffer. // Extract the header from the front of the buffer (after the length prefixes)
byte[] headerBuf = new byte[headerLen]; byte[] headerBuf = new byte[headerLen];
curPacketBuf.get(headerBuf); curPacketBuf.get(headerBuf);
if (curHeader == null) { if (curHeader == null) {
@ -197,10 +197,6 @@ private void doRead(ReadableByteChannel ch, InputStream in)
public void mirrorPacketTo(DataOutputStream mirrorOut) throws IOException { public void mirrorPacketTo(DataOutputStream mirrorOut) throws IOException {
Preconditions.checkState(!useDirectBuffers, Preconditions.checkState(!useDirectBuffers,
"Currently only supported for non-direct buffers"); "Currently only supported for non-direct buffers");
assert lengthPrefixBuf.capacity() == PacketHeader.PKT_LENGTHS_LEN;
mirrorOut.write(lengthPrefixBuf.array(),
lengthPrefixBuf.arrayOffset(),
lengthPrefixBuf.capacity());
mirrorOut.write(curPacketBuf.array(), mirrorOut.write(curPacketBuf.array(),
curPacketBuf.arrayOffset(), curPacketBuf.arrayOffset(),
curPacketBuf.remaining()); curPacketBuf.remaining());
@ -223,23 +219,36 @@ private static void doReadFully(ReadableByteChannel ch, InputStream in,
private void reslicePacket( private void reslicePacket(
int headerLen, int checksumsLen, int dataLen) { int headerLen, int checksumsLen, int dataLen) {
// Packet structure (refer to doRead() for details):
// PLEN HLEN HEADER CHECKSUMS DATA
// 32-bit 16-bit <protobuf> <variable length>
// |--- lenThroughHeader ----|
// |----------- lenThroughChecksums ----|
// |------------------- lenThroughData ------|
int lenThroughHeader = PacketHeader.PKT_LENGTHS_LEN + headerLen;
int lenThroughChecksums = lenThroughHeader + checksumsLen;
int lenThroughData = lenThroughChecksums + dataLen;
assert dataLen >= 0 : "invalid datalen: " + dataLen; assert dataLen >= 0 : "invalid datalen: " + dataLen;
assert curPacketBuf.position() == lenThroughHeader;
assert curPacketBuf.position() == headerLen; assert curPacketBuf.limit() == lenThroughData :
assert checksumsLen + dataLen == curPacketBuf.remaining() :
"headerLen= " + headerLen + " clen=" + checksumsLen + " dlen=" + dataLen + "headerLen= " + headerLen + " clen=" + checksumsLen + " dlen=" + dataLen +
" rem=" + curPacketBuf.remaining(); " rem=" + curPacketBuf.remaining();
curPacketBuf.position(headerLen); // Slice the checksums.
curPacketBuf.limit(headerLen + checksumsLen); curPacketBuf.position(lenThroughHeader);
curPacketBuf.limit(lenThroughChecksums);
curChecksumSlice = curPacketBuf.slice(); curChecksumSlice = curPacketBuf.slice();
curPacketBuf.position(headerLen + checksumsLen); // Slice the data.
curPacketBuf.limit(headerLen + checksumsLen + dataLen); curPacketBuf.position(lenThroughChecksums);
curPacketBuf.limit(lenThroughData);
curDataSlice = curPacketBuf.slice(); curDataSlice = curPacketBuf.slice();
// Reset buffer to point to the entirety of the packet (including
// length prefixes)
curPacketBuf.position(0); curPacketBuf.position(0);
curPacketBuf.limit(headerLen + checksumsLen + dataLen); curPacketBuf.limit(lenThroughData);
} }
@ -258,12 +267,21 @@ private void reallocPacketBuf(int atLeastCapacity) {
// one. // one.
if (curPacketBuf == null || if (curPacketBuf == null ||
curPacketBuf.capacity() < atLeastCapacity) { curPacketBuf.capacity() < atLeastCapacity) {
returnPacketBufToPool(); ByteBuffer newBuf;
if (useDirectBuffers) { if (useDirectBuffers) {
curPacketBuf = bufferPool.getBuffer(atLeastCapacity); newBuf = bufferPool.getBuffer(atLeastCapacity);
} else { } else {
curPacketBuf = ByteBuffer.allocate(atLeastCapacity); newBuf = ByteBuffer.allocate(atLeastCapacity);
} }
// If reallocing an existing buffer, copy the old packet length
// prefixes over
if (curPacketBuf != null) {
curPacketBuf.flip();
newBuf.put(curPacketBuf);
}
returnPacketBufToPool();
curPacketBuf = newBuf;
} }
} }

View File

@ -20,45 +20,40 @@
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.metrics2.util.Quantile;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.metrics2.util.SampleQuantiles;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.log4j.Level;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Stopwatch;
/** /**
* This class tests hflushing concurrently from many threads. * This class tests hflushing concurrently from many threads.
*/ */
public class TestMultiThreadedHflush { public class TestMultiThreadedHflush {
static final int blockSize = 1024*1024; static final int blockSize = 1024*1024;
static final int numBlocks = 10;
static final int fileSize = numBlocks * blockSize + 1;
private static final int NUM_THREADS = 10; private static final int NUM_THREADS = 10;
private static final int WRITE_SIZE = 517; private static final int WRITE_SIZE = 517;
private static final int NUM_WRITES_PER_THREAD = 1000; private static final int NUM_WRITES_PER_THREAD = 1000;
private byte[] toWrite = null; private byte[] toWrite = null;
{ private final SampleQuantiles quantiles = new SampleQuantiles(
((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL); new Quantile[] {
((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); new Quantile(0.50, 0.050),
((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL); new Quantile(0.75, 0.025), new Quantile(0.90, 0.010),
((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) });
((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)InterDatanodeProtocol.LOG).getLogger().setLevel(Level.ALL);
}
/* /*
* creates a file but does not close it * creates a file but does not close it
@ -104,8 +99,11 @@ public void run() {
} }
private void doAWrite() throws IOException { private void doAWrite() throws IOException {
Stopwatch sw = new Stopwatch().start();
stm.write(toWrite); stm.write(toWrite);
stm.hflush(); stm.hflush();
long micros = sw.elapsedTime(TimeUnit.MICROSECONDS);
quantiles.insert(micros);
} }
} }
@ -115,14 +113,28 @@ private void doAWrite() throws IOException {
* They all finish before the file is closed. * They all finish before the file is closed.
*/ */
@Test @Test
public void testMultipleHflushers() throws Exception { public void testMultipleHflushersRepl1() throws Exception {
doTestMultipleHflushers(1);
}
@Test
public void testMultipleHflushersRepl3() throws Exception {
doTestMultipleHflushers(3);
}
private void doTestMultipleHflushers(int repl) throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(repl)
.build();
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();
Path p = new Path("/multiple-hflushers.dat"); Path p = new Path("/multiple-hflushers.dat");
try { try {
doMultithreadedWrites(conf, p, NUM_THREADS, WRITE_SIZE, NUM_WRITES_PER_THREAD); doMultithreadedWrites(conf, p, NUM_THREADS, WRITE_SIZE,
NUM_WRITES_PER_THREAD, repl);
System.out.println("Latency quantiles (in microseconds):\n" +
quantiles);
} finally { } finally {
fs.close(); fs.close();
cluster.shutdown(); cluster.shutdown();
@ -200,13 +212,13 @@ public void run() {
} }
public void doMultithreadedWrites( public void doMultithreadedWrites(
Configuration conf, Path p, int numThreads, int bufferSize, int numWrites) Configuration conf, Path p, int numThreads, int bufferSize, int numWrites,
throws Exception { int replication) throws Exception {
initBuffer(bufferSize); initBuffer(bufferSize);
// create a new file. // create a new file.
FileSystem fs = p.getFileSystem(conf); FileSystem fs = p.getFileSystem(conf);
FSDataOutputStream stm = createFile(fs, p, 1); FSDataOutputStream stm = createFile(fs, p, replication);
System.out.println("Created file simpleFlush.dat"); System.out.println("Created file simpleFlush.dat");
// There have been a couple issues with flushing empty buffers, so do // There have been a couple issues with flushing empty buffers, so do
@ -240,20 +252,41 @@ public void doMultithreadedWrites(
} }
public static void main(String args[]) throws Exception { public static void main(String args[]) throws Exception {
if (args.length != 1) { System.exit(ToolRunner.run(new CLIBenchmark(), args));
System.err.println( }
"usage: " + TestMultiThreadedHflush.class.getSimpleName() +
" <path to test file> "); private static class CLIBenchmark extends Configured implements Tool {
System.exit(1); public int run(String args[]) throws Exception {
} if (args.length != 1) {
TestMultiThreadedHflush test = new TestMultiThreadedHflush(); System.err.println(
Configuration conf = new Configuration(); "usage: " + TestMultiThreadedHflush.class.getSimpleName() +
Path p = new Path(args[0]); " <path to test file> ");
long st = System.nanoTime(); System.err.println(
test.doMultithreadedWrites(conf, p, 10, 511, 50000); "Configurations settable by -D options:\n" +
long et = System.nanoTime(); " num.threads [default 10] - how many threads to run\n" +
" write.size [default 511] - bytes per write\n" +
System.out.println("Finished in " + ((et - st) / 1000000) + "ms"); " num.writes [default 50000] - how many writes to perform");
System.exit(1);
}
TestMultiThreadedHflush test = new TestMultiThreadedHflush();
Configuration conf = getConf();
Path p = new Path(args[0]);
int numThreads = conf.getInt("num.threads", 10);
int writeSize = conf.getInt("write.size", 511);
int numWrites = conf.getInt("num.writes", 50000);
int replication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
Stopwatch sw = new Stopwatch().start();
test.doMultithreadedWrites(conf, p, numThreads, writeSize, numWrites,
replication);
sw.stop();
System.out.println("Finished in " + sw.elapsedMillis() + "ms");
System.out.println("Latency quantiles (in microseconds):\n" +
test.quantiles);
return 0;
}
} }
} }

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol.datatransfer;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.*;
public class TestPacketReceiver {
private static long OFFSET_IN_BLOCK = 12345L;
private static int SEQNO = 54321;
private byte[] prepareFakePacket(byte[] data, byte[] sums) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
int packetLen = data.length + sums.length + 4;
PacketHeader header = new PacketHeader(
packetLen, OFFSET_IN_BLOCK, SEQNO, false, data.length, false);
header.write(dos);
dos.write(sums);
dos.write(data);
dos.flush();
return baos.toByteArray();
}
private static byte[] remainingAsArray(ByteBuffer buf) {
byte[] b = new byte[buf.remaining()];
buf.get(b);
return b;
}
@Test
public void testReceiveAndMirror() throws IOException {
PacketReceiver pr = new PacketReceiver(false);
// Test three different lengths, to force reallocing
// the buffer as it grows.
doTestReceiveAndMirror(pr, 100, 10);
doTestReceiveAndMirror(pr, 50, 10);
doTestReceiveAndMirror(pr, 150, 10);
pr.close();
}
private void doTestReceiveAndMirror(PacketReceiver pr,
int dataLen, int checksumsLen) throws IOException {
final byte[] DATA = AppendTestUtil.initBuffer(dataLen);
final byte[] CHECKSUMS = AppendTestUtil.initBuffer(checksumsLen);
byte[] packet = prepareFakePacket(DATA, CHECKSUMS);
ByteArrayInputStream in = new ByteArrayInputStream(packet);
pr.receiveNextPacket(in);
ByteBuffer parsedData = pr.getDataSlice();
assertArrayEquals(DATA, remainingAsArray(parsedData));
ByteBuffer parsedChecksums = pr.getChecksumSlice();
assertArrayEquals(CHECKSUMS, remainingAsArray(parsedChecksums));
PacketHeader header = pr.getHeader();
assertEquals(SEQNO, header.getSeqno());
assertEquals(OFFSET_IN_BLOCK, header.getOffsetInBlock());
// Mirror the packet to an output stream and make sure it matches
// the packet we sent.
ByteArrayOutputStream mirrored = new ByteArrayOutputStream();
mirrored = Mockito.spy(mirrored);
pr.mirrorPacketTo(new DataOutputStream(mirrored));
// The write should be done in a single call. Otherwise we may hit
// nasty interactions with nagling (eg HDFS-4049).
Mockito.verify(mirrored, Mockito.times(1))
.write(Mockito.<byte[]>any(), Mockito.anyInt(),
Mockito.eq(packet.length));
Mockito.verifyNoMoreInteractions(mirrored);
assertArrayEquals(packet, mirrored.toByteArray());
}
}