diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index b1c55a02e3..8d651d855c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -404,6 +404,7 @@ public class DFSStripedOutputStream extends DFSOutputStream LOG.debug("newly failed streamers: " + newFailed); } if (failCount > (numAllBlocks - numDataBlocks)) { + closeAllStreamers(); throw new IOException("Failed: the number of failed blocks = " + failCount + " > the number of parity blocks = " + (numAllBlocks - numDataBlocks)); @@ -411,6 +412,13 @@ public class DFSStripedOutputStream extends DFSOutputStream return newFailed; } + private void closeAllStreamers() { + // The write has failed, Close all the streamers. + for (StripedDataStreamer streamer : streamers) { + streamer.close(true); + } + } + private void handleCurrentStreamerFailure(String err, Exception e) throws IOException { currentPacket = null; @@ -670,6 +678,8 @@ public class DFSStripedOutputStream extends DFSOutputStream newFailed = waitCreatingStreamers(healthySet); if (newFailed.size() + failedStreamers.size() > numAllBlocks - numDataBlocks) { + // The write has failed, Close all the streamers. + closeAllStreamers(); throw new IOException( "Data streamers failed while creating new block streams: " + newFailed + ". There are not enough healthy streamers."); @@ -1169,32 +1179,32 @@ public class DFSStripedOutputStream extends DFSOutputStream @Override protected synchronized void closeImpl() throws IOException { - if (isClosed()) { - exceptionLastSeen.check(true); - - // Writing to at least {dataUnits} replicas can be considered as success, - // and the rest of data can be recovered. - final int minReplication = ecPolicy.getNumDataUnits(); - int goodStreamers = 0; - final MultipleIOException.Builder b = new MultipleIOException.Builder(); - for (final StripedDataStreamer si : streamers) { - try { - si.getLastException().check(true); - goodStreamers++; - } catch (IOException e) { - b.add(e); - } - } - if (goodStreamers < minReplication) { - final IOException ioe = b.build(); - if (ioe != null) { - throw ioe; - } - } - return; - } - try { + if (isClosed()) { + exceptionLastSeen.check(true); + + // Writing to at least {dataUnits} replicas can be considered as + // success, and the rest of data can be recovered. + final int minReplication = ecPolicy.getNumDataUnits(); + int goodStreamers = 0; + final MultipleIOException.Builder b = new MultipleIOException.Builder(); + for (final StripedDataStreamer si : streamers) { + try { + si.getLastException().check(true); + goodStreamers++; + } catch (IOException e) { + b.add(e); + } + } + if (goodStreamers < minReplication) { + final IOException ioe = b.build(); + if (ioe != null) { + throw ioe; + } + } + return; + } + try { // flush from all upper layers flushBuffer();