HDFS-9794. Streamer threads may leak if failure happens when closing the striped outputstream. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2016-02-12 10:59:31 -08:00
parent b21bbe9ed1
commit f3c91a41a5
2 changed files with 36 additions and 26 deletions

View File

@ -822,7 +822,7 @@ protected void closeThreads(boolean force) throws IOException {
streamer.closeSocket(); streamer.closeSocket();
} catch (Exception e) { } catch (Exception e) {
try { try {
handleCurrentStreamerFailure("force=" + force, e); handleStreamerFailure("force=" + force, e, streamer);
} catch (IOException ioe) { } catch (IOException ioe) {
b.add(ioe); b.add(ioe);
} }
@ -947,38 +947,45 @@ protected synchronized void closeImpl() throws IOException {
} }
try { try {
// flush from all upper layers try {
flushBuffer(); // flush from all upper layers
// if the last stripe is incomplete, generate and write parity cells flushBuffer();
if (generateParityCellsForLastStripe()) { // if the last stripe is incomplete, generate and write parity cells
writeParityCells(); if (generateParityCellsForLastStripe()) {
} writeParityCells();
enqueueAllCurrentPackets(); }
enqueueAllCurrentPackets();
// flush all the data packets // flush all the data packets
flushAllInternals(); flushAllInternals();
// check failures // check failures
checkStreamerFailures(); checkStreamerFailures();
for (int i = 0; i < numAllBlocks; i++) { for (int i = 0; i < numAllBlocks; i++) {
final StripedDataStreamer s = setCurrentStreamer(i); final StripedDataStreamer s = setCurrentStreamer(i);
if (s.isHealthy()) { if (s.isHealthy()) {
try { try {
if (s.getBytesCurBlock() > 0) { if (s.getBytesCurBlock() > 0) {
setCurrentPacketToEmpty(); setCurrentPacketToEmpty();
}
// flush the last "close" packet to Datanode
flushInternal();
} catch (Exception e) {
// TODO for both close and endBlock, we currently do not handle
// failures when sending the last packet. We actually do not need to
// bump GS for this kind of failure. Thus counting the total number
// of failures may be good enough.
} }
// flush the last "close" packet to Datanode
flushInternal();
} catch(Exception e) {
// TODO for both close and endBlock, we currently do not handle
// failures when sending the last packet. We actually do not need to
// bump GS for this kind of failure. Thus counting the total number
// of failures may be good enough.
} }
} }
} finally {
// Failures may happen when flushing data/parity data out. Exceptions
// may be thrown if more than 3 streamers fail, or updatePipeline RPC
// fails. Streamers may keep waiting for the new block/GS information.
// Thus need to force closing these threads.
closeThreads(true);
} }
closeThreads(false);
try (TraceScope ignored = try (TraceScope ignored =
dfsClient.getTracer().newScope("completeFile")) { dfsClient.getTracer().newScope("completeFile")) {
completeFile(currentBlockGroup); completeFile(currentBlockGroup);

View File

@ -426,6 +426,9 @@ Trunk (Unreleased)
HDFS-9789. Correctly update DataNode's scheduled block size when writing HDFS-9789. Correctly update DataNode's scheduled block size when writing
small EC file. (jing9) small EC file. (jing9)
HDFS-9794. Streamer threads may leak if failure happens when closing the
striped outputstream. (jing9)
BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS
HDFS-7347. Configurable erasure coding policy for individual files and HDFS-7347. Configurable erasure coding policy for individual files and