From 352d299cf8ebe330d24117df98d1e6a64ae38c26 Mon Sep 17 00:00:00 2001 From: Akira Ajisaka Date: Tue, 8 Mar 2016 10:43:17 +0900 Subject: [PATCH] HDFS-9812. Streamer threads leak if failure happens when closing DFSOutputStream. Contributed by Lin Yiqun. --- .../java/org/apache/hadoop/hdfs/DFSOutputStream.java | 9 +++++++-- .../main/java/org/apache/hadoop/hdfs/DataStreamer.java | 8 ++++---- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 1c58b28fbf..dc88e08991 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -770,14 +770,19 @@ protected synchronized void closeImpl() throws IOException { flushInternal(); // flush all data to Datanodes // get last block before destroying the streamer ExtendedBlock lastBlock = getStreamer().getBlock(); - closeThreads(false); + try (TraceScope ignored = dfsClient.getTracer().newScope("completeFile")) { completeFile(lastBlock); } } catch (ClosedChannelException ignored) { } finally { - setClosed(); + // Failures may happen when flushing data. + // Streamers may keep waiting for the new block information. + // Thus need to force closing these threads. + // Don't need to call setClosed() because closeThreads(true) + // calls setClosed() in the finally block. + closeThreads(true); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 9d3cb5565f..9ae443d2d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -507,7 +507,7 @@ private void initDataStreaming() { } protected void endBlock() { - LOG.debug("Closing old block " + block); + LOG.debug("Closing old block {}", block); this.setName("DataStreamer for file " + src); closeResponder(); closeStream(); @@ -591,7 +591,7 @@ public void run() { LOG.debug("stage=" + stage + ", " + this); } if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { - LOG.debug("Allocating new block: " + this); + LOG.debug("Allocating new block: {}", this); setPipeline(nextBlockOutputStream()); initDataStreaming(); } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { @@ -644,7 +644,7 @@ public void run() { } } - LOG.debug(this + " sending " + one); + LOG.debug("{} sending {}", this, one); // write out data to remote datanode try (TraceScope ignored = dfsClient.getTracer(). @@ -1766,7 +1766,7 @@ void queuePacket(DFSPacket packet) { packet.addTraceParent(Tracer.getCurrentSpanId()); dataQueue.addLast(packet); lastQueuedSeqno = packet.getSeqno(); - LOG.debug("Queued " + packet + ", " + this); + LOG.debug("Queued {}, {}", packet, this); dataQueue.notifyAll(); } }