diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 8292d0a026..3dfdbee39f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -822,7 +822,7 @@ protected void closeThreads(boolean force) throws IOException { streamer.closeSocket(); } catch (Exception e) { try { - handleCurrentStreamerFailure("force=" + force, e); + handleStreamerFailure("force=" + force, e, streamer); } catch (IOException ioe) { b.add(ioe); } @@ -947,38 +947,45 @@ protected synchronized void closeImpl() throws IOException { } try { - // flush from all upper layers - flushBuffer(); - // if the last stripe is incomplete, generate and write parity cells - if (generateParityCellsForLastStripe()) { - writeParityCells(); - } - enqueueAllCurrentPackets(); + try { + // flush from all upper layers + flushBuffer(); + // if the last stripe is incomplete, generate and write parity cells + if (generateParityCellsForLastStripe()) { + writeParityCells(); + } + enqueueAllCurrentPackets(); - // flush all the data packets - flushAllInternals(); - // check failures - checkStreamerFailures(); + // flush all the data packets + flushAllInternals(); + // check failures + checkStreamerFailures(); - for (int i = 0; i < numAllBlocks; i++) { - final StripedDataStreamer s = setCurrentStreamer(i); - if (s.isHealthy()) { - try { - if (s.getBytesCurBlock() > 0) { - setCurrentPacketToEmpty(); + for (int i = 0; i < numAllBlocks; i++) { + final StripedDataStreamer s = setCurrentStreamer(i); + if (s.isHealthy()) { + try { + if (s.getBytesCurBlock() > 0) { + setCurrentPacketToEmpty(); + } + // flush the last "close" packet to Datanode + flushInternal(); + } catch (Exception e) { + // TODO for both close and endBlock, we currently do not handle + // failures when sending the last packet. We actually do not need to + // bump GS for this kind of failure. Thus counting the total number + // of failures may be good enough. } - // flush the last "close" packet to Datanode - flushInternal(); - } catch(Exception e) { - // TODO for both close and endBlock, we currently do not handle - // failures when sending the last packet. We actually do not need to - // bump GS for this kind of failure. Thus counting the total number - // of failures may be good enough. } } + } finally { + // Failures may happen when flushing data/parity data out. Exceptions + // may be thrown if more than 3 streamers fail, or updatePipeline RPC + // fails. Streamers may keep waiting for the new block/GS information. + // Thus need to force closing these threads. + closeThreads(true); } - closeThreads(false); try (TraceScope ignored = dfsClient.getTracer().newScope("completeFile")) { completeFile(currentBlockGroup); diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 8633c31aec..687e35f6c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -426,6 +426,9 @@ Trunk (Unreleased) HDFS-9789. Correctly update DataNode's scheduled block size when writing small EC file. (jing9) + HDFS-9794. Streamer threads may leak if failure happens when closing the + striped outputstream. (jing9) + BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS HDFS-7347. Configurable erasure coding policy for individual files and