HDFS-12999. When reach the end of the block group, it may not need to flush all the data packets(flushAllInternals) twice. Contributed by lufei and Fei Hui.

This commit is contained in:
Ayush Saxena 2019-12-25 11:07:25 +05:30
parent 40887c9b12
commit df622cf4a3

View File

@ -572,7 +572,7 @@ public class DFSStripedOutputStream extends DFSOutputStream
// if this is the end of the block group, end each internal block // if this is the end of the block group, end each internal block
if (shouldEndBlockGroup()) { if (shouldEndBlockGroup()) {
flushAllInternals(); flushAllInternals();
checkStreamerFailures(); checkStreamerFailures(false);
for (int i = 0; i < numAllBlocks; i++) { for (int i = 0; i < numAllBlocks; i++) {
final StripedDataStreamer s = setCurrentStreamer(i); final StripedDataStreamer s = setCurrentStreamer(i);
if (s.isHealthy()) { if (s.isHealthy()) {
@ -583,7 +583,7 @@ public class DFSStripedOutputStream extends DFSOutputStream
} }
} else { } else {
// check failure state for all the streamers. Bump GS if necessary // check failure state for all the streamers. Bump GS if necessary
checkStreamerFailures(); checkStreamerFailures(true);
} }
} }
setCurrentStreamer(next); setCurrentStreamer(next);
@ -639,15 +639,18 @@ public class DFSStripedOutputStream extends DFSOutputStream
* written a full stripe (i.e., enqueue all packets for a full stripe), or * written a full stripe (i.e., enqueue all packets for a full stripe), or
* when we're closing the outputstream. * when we're closing the outputstream.
*/ */
private void checkStreamerFailures() throws IOException { private void checkStreamerFailures(boolean isNeedFlushAllPackets)
throws IOException {
Set<StripedDataStreamer> newFailed = checkStreamers(); Set<StripedDataStreamer> newFailed = checkStreamers();
if (newFailed.size() == 0) { if (newFailed.size() == 0) {
return; return;
} }
if (isNeedFlushAllPackets) {
// for healthy streamers, wait till all of them have fetched the new block // for healthy streamers, wait till all of them have fetched the new block
// and flushed out all the enqueued packets. // and flushed out all the enqueued packets.
flushAllInternals(); flushAllInternals();
}
// recheck failed streamers again after the flush // recheck failed streamers again after the flush
newFailed = checkStreamers(); newFailed = checkStreamers();
while (newFailed.size() > 0) { while (newFailed.size() > 0) {
@ -1204,7 +1207,7 @@ public class DFSStripedOutputStream extends DFSOutputStream
// flush all the data packets // flush all the data packets
flushAllInternals(); flushAllInternals();
// check failures // check failures
checkStreamerFailures(); checkStreamerFailures(false);
for (int i = 0; i < numAllBlocks; i++) { for (int i = 0; i < numAllBlocks; i++) {
final StripedDataStreamer s = setCurrentStreamer(i); final StripedDataStreamer s = setCurrentStreamer(i);