diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index afb9e2afb0..cb1f73d2fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -28,8 +28,9 @@ import java.io.OutputStreamWriter; import java.io.Writer; import java.nio.ByteBuffer; +import java.util.ArrayDeque; import java.util.Arrays; -import java.util.LinkedList; +import java.util.Queue; import java.util.zip.Checksum; import org.apache.commons.logging.Log; @@ -1208,9 +1209,9 @@ private enum PacketResponderType { * Processes responses from downstream datanodes in the pipeline * and sends back replies to the originator. */ - class PacketResponder implements Runnable, Closeable { + class PacketResponder implements Runnable, Closeable { /** queue for packets waiting for ack - synchronization using monitor lock */ - private final LinkedList ackQueue = new LinkedList(); + private final Queue ackQueue = new ArrayDeque<>(); /** the thread that spawns this responder */ private final Thread receiverThread = Thread.currentThread(); /** is this responder running? - synchronization using monitor lock */ @@ -1264,12 +1265,10 @@ void enqueue(final long seqno, final boolean lastPacketInBlock, final long offsetInBlock, final Status ackStatus) { final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock, System.nanoTime(), ackStatus); - if(LOG.isDebugEnabled()) { - LOG.debug(myString + ": enqueue " + p); - } - synchronized(ackQueue) { + LOG.debug("{}: enqueue {}", this, p); + synchronized (ackQueue) { if (running) { - ackQueue.addLast(p); + ackQueue.add(p); ackQueue.notifyAll(); } } @@ -1321,15 +1320,13 @@ void sendOOBResponse(final Status ackStatus) throws IOException, /** Wait for a packet with given {@code seqno} to be enqueued to ackQueue */ Packet waitForAckHead(long seqno) throws InterruptedException { - synchronized(ackQueue) { - while (isRunning() && ackQueue.size() == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug(myString + ": seqno=" + seqno + - " waiting for local datanode to finish write."); - } + synchronized (ackQueue) { + while (isRunning() && ackQueue.isEmpty()) { + LOG.debug("{}: seqno={} waiting for local datanode to finish write.", + myString, seqno); ackQueue.wait(); } - return isRunning() ? ackQueue.getFirst() : null; + return isRunning() ? ackQueue.element() : null; } } @@ -1338,8 +1335,8 @@ Packet waitForAckHead(long seqno) throws InterruptedException { */ @Override public void close() { - synchronized(ackQueue) { - while (isRunning() && ackQueue.size() != 0) { + synchronized (ackQueue) { + while (isRunning() && !ackQueue.isEmpty()) { try { ackQueue.wait(); } catch (InterruptedException e) { @@ -1347,14 +1344,12 @@ public void close() { Thread.currentThread().interrupt(); } } - if(LOG.isDebugEnabled()) { - LOG.debug(myString + ": closing"); - } + LOG.debug("{}: closing", this); running = false; ackQueue.notifyAll(); } - synchronized(this) { + synchronized (this) { running = false; notifyAll(); } @@ -1657,12 +1652,12 @@ private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno, /** * Remove a packet from the head of the ack queue - * + * * This should be called only when the ack queue is not empty */ private void removeAckHead() { - synchronized(ackQueue) { - ackQueue.removeFirst(); + synchronized (ackQueue) { + ackQueue.remove(); ackQueue.notifyAll(); } }