From df622cf4a32ee172ded6c4b3b97a1e49befc4f10 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Wed, 25 Dec 2019 11:07:25 +0530 Subject: [PATCH] HDFS-12999. When reach the end of the block group, it may not need to flush all the data packets(flushAllInternals) twice. Contributed by lufei and Fei Hui. --- .../hadoop/hdfs/DFSStripedOutputStream.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index ff81995ad6..b1c55a02e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -572,7 +572,7 @@ public class DFSStripedOutputStream extends DFSOutputStream // if this is the end of the block group, end each internal block if (shouldEndBlockGroup()) { flushAllInternals(); - checkStreamerFailures(); + checkStreamerFailures(false); for (int i = 0; i < numAllBlocks; i++) { final StripedDataStreamer s = setCurrentStreamer(i); if (s.isHealthy()) { @@ -583,7 +583,7 @@ public class DFSStripedOutputStream extends DFSOutputStream } } else { // check failure state for all the streamers. Bump GS if necessary - checkStreamerFailures(); + checkStreamerFailures(true); } } setCurrentStreamer(next); @@ -639,15 +639,18 @@ public class DFSStripedOutputStream extends DFSOutputStream * written a full stripe (i.e., enqueue all packets for a full stripe), or * when we're closing the outputstream. */ - private void checkStreamerFailures() throws IOException { + private void checkStreamerFailures(boolean isNeedFlushAllPackets) + throws IOException { Set newFailed = checkStreamers(); if (newFailed.size() == 0) { return; } - // for healthy streamers, wait till all of them have fetched the new block - // and flushed out all the enqueued packets. - flushAllInternals(); + if (isNeedFlushAllPackets) { + // for healthy streamers, wait till all of them have fetched the new block + // and flushed out all the enqueued packets. + flushAllInternals(); + } // recheck failed streamers again after the flush newFailed = checkStreamers(); while (newFailed.size() > 0) { @@ -1204,7 +1207,7 @@ public class DFSStripedOutputStream extends DFSOutputStream // flush all the data packets flushAllInternals(); // check failures - checkStreamerFailures(); + checkStreamerFailures(false); for (int i = 0; i < numAllBlocks; i++) { final StripedDataStreamer s = setCurrentStreamer(i);