HDFS-13968. BlockReceiver Array-Based Queue. Contributed by BELUGA BEHR.

This commit is contained in:
Inigo Goiri 2018-10-11 13:49:39 -07:00
parent eb34b5f8af
commit 3532aa3886

View File

@ -28,8 +28,9 @@
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.io.Writer; import java.io.Writer;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Arrays; import java.util.Arrays;
import java.util.LinkedList; import java.util.Queue;
import java.util.zip.Checksum; import java.util.zip.Checksum;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -1210,7 +1211,7 @@ private enum PacketResponderType {
*/ */
class PacketResponder implements Runnable, Closeable { class PacketResponder implements Runnable, Closeable {
/** queue for packets waiting for ack - synchronization using monitor lock */ /** queue for packets waiting for ack - synchronization using monitor lock */
private final LinkedList<Packet> ackQueue = new LinkedList<Packet>(); private final Queue<Packet> ackQueue = new ArrayDeque<>();
/** the thread that spawns this responder */ /** the thread that spawns this responder */
private final Thread receiverThread = Thread.currentThread(); private final Thread receiverThread = Thread.currentThread();
/** is this responder running? - synchronization using monitor lock */ /** is this responder running? - synchronization using monitor lock */
@ -1264,12 +1265,10 @@ void enqueue(final long seqno, final boolean lastPacketInBlock,
final long offsetInBlock, final Status ackStatus) { final long offsetInBlock, final Status ackStatus) {
final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock, final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock,
System.nanoTime(), ackStatus); System.nanoTime(), ackStatus);
if(LOG.isDebugEnabled()) { LOG.debug("{}: enqueue {}", this, p);
LOG.debug(myString + ": enqueue " + p); synchronized (ackQueue) {
}
synchronized(ackQueue) {
if (running) { if (running) {
ackQueue.addLast(p); ackQueue.add(p);
ackQueue.notifyAll(); ackQueue.notifyAll();
} }
} }
@ -1321,15 +1320,13 @@ void sendOOBResponse(final Status ackStatus) throws IOException,
/** Wait for a packet with given {@code seqno} to be enqueued to ackQueue */ /** Wait for a packet with given {@code seqno} to be enqueued to ackQueue */
Packet waitForAckHead(long seqno) throws InterruptedException { Packet waitForAckHead(long seqno) throws InterruptedException {
synchronized(ackQueue) { synchronized (ackQueue) {
while (isRunning() && ackQueue.size() == 0) { while (isRunning() && ackQueue.isEmpty()) {
if (LOG.isDebugEnabled()) { LOG.debug("{}: seqno={} waiting for local datanode to finish write.",
LOG.debug(myString + ": seqno=" + seqno + myString, seqno);
" waiting for local datanode to finish write.");
}
ackQueue.wait(); ackQueue.wait();
} }
return isRunning() ? ackQueue.getFirst() : null; return isRunning() ? ackQueue.element() : null;
} }
} }
@ -1338,8 +1335,8 @@ Packet waitForAckHead(long seqno) throws InterruptedException {
*/ */
@Override @Override
public void close() { public void close() {
synchronized(ackQueue) { synchronized (ackQueue) {
while (isRunning() && ackQueue.size() != 0) { while (isRunning() && !ackQueue.isEmpty()) {
try { try {
ackQueue.wait(); ackQueue.wait();
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -1347,14 +1344,12 @@ public void close() {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} }
if(LOG.isDebugEnabled()) { LOG.debug("{}: closing", this);
LOG.debug(myString + ": closing");
}
running = false; running = false;
ackQueue.notifyAll(); ackQueue.notifyAll();
} }
synchronized(this) { synchronized (this) {
running = false; running = false;
notifyAll(); notifyAll();
} }
@ -1661,8 +1656,8 @@ private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno,
* This should be called only when the ack queue is not empty * This should be called only when the ack queue is not empty
*/ */
private void removeAckHead() { private void removeAckHead() {
synchronized(ackQueue) { synchronized (ackQueue) {
ackQueue.removeFirst(); ackQueue.remove();
ackQueue.notifyAll(); ackQueue.notifyAll();
} }
} }