HDFS-9812. Streamer threads leak if failure happens when closing DFSOutputStream. Contributed by Lin Yiqun.

This commit is contained in:
Akira Ajisaka 2016-03-08 10:43:17 +09:00
parent 391da36d93
commit 352d299cf8
2 changed files with 11 additions and 6 deletions

View File

@ -770,14 +770,19 @@ protected synchronized void closeImpl() throws IOException {
flushInternal(); // flush all data to Datanodes flushInternal(); // flush all data to Datanodes
// get last block before destroying the streamer // get last block before destroying the streamer
ExtendedBlock lastBlock = getStreamer().getBlock(); ExtendedBlock lastBlock = getStreamer().getBlock();
closeThreads(false);
try (TraceScope ignored = try (TraceScope ignored =
dfsClient.getTracer().newScope("completeFile")) { dfsClient.getTracer().newScope("completeFile")) {
completeFile(lastBlock); completeFile(lastBlock);
} }
} catch (ClosedChannelException ignored) { } catch (ClosedChannelException ignored) {
} finally { } finally {
setClosed(); // Failures may happen when flushing data.
// Streamers may keep waiting for the new block information.
// Thus need to force closing these threads.
// Don't need to call setClosed() because closeThreads(true)
// calls setClosed() in the finally block.
closeThreads(true);
} }
} }

View File

@ -507,7 +507,7 @@ private void initDataStreaming() {
} }
protected void endBlock() { protected void endBlock() {
LOG.debug("Closing old block " + block); LOG.debug("Closing old block {}", block);
this.setName("DataStreamer for file " + src); this.setName("DataStreamer for file " + src);
closeResponder(); closeResponder();
closeStream(); closeStream();
@ -591,7 +591,7 @@ public void run() {
LOG.debug("stage=" + stage + ", " + this); LOG.debug("stage=" + stage + ", " + this);
} }
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
LOG.debug("Allocating new block: " + this); LOG.debug("Allocating new block: {}", this);
setPipeline(nextBlockOutputStream()); setPipeline(nextBlockOutputStream());
initDataStreaming(); initDataStreaming();
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
@ -644,7 +644,7 @@ public void run() {
} }
} }
LOG.debug(this + " sending " + one); LOG.debug("{} sending {}", this, one);
// write out data to remote datanode // write out data to remote datanode
try (TraceScope ignored = dfsClient.getTracer(). try (TraceScope ignored = dfsClient.getTracer().
@ -1766,7 +1766,7 @@ void queuePacket(DFSPacket packet) {
packet.addTraceParent(Tracer.getCurrentSpanId()); packet.addTraceParent(Tracer.getCurrentSpanId());
dataQueue.addLast(packet); dataQueue.addLast(packet);
lastQueuedSeqno = packet.getSeqno(); lastQueuedSeqno = packet.getSeqno();
LOG.debug("Queued " + packet + ", " + this); LOG.debug("Queued {}, {}", packet, this);
dataQueue.notifyAll(); dataQueue.notifyAll();
} }
} }