From a6f86af39f34f8342587feb16a56857738d3c618 Mon Sep 17 00:00:00 2001 From: Surendra Singh Lilhore Date: Wed, 29 Apr 2020 10:58:35 +0530 Subject: [PATCH] HDFS-15210. EC : File write hanged when DN is shutdown by admin command. Contributed by Surendra Singh Lilhore. (cherry picked from commit db6252b6c3959220c6f985f940e2e731f99d8e30) --- .../hadoop/hdfs/DFSStripedOutputStream.java | 15 +++++++++++++++ .../apache/hadoop/hdfs/StripedDataStreamer.java | 3 ++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 2ed11ff20a..7c3965647c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -283,6 +283,7 @@ private void flipDataBuffers() { private ExecutorService flushAllExecutor; private CompletionService flushAllExecutorCompletionService; private int blockGroupIndex; + private long datanodeRestartTimeout; /** Construct a new output stream for creating a file. */ DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, @@ -322,6 +323,7 @@ private void flipDataBuffers() { streamers.add(streamer); } currentPackets = new DFSPacket[streamers.size()]; + datanodeRestartTimeout = dfsClient.getConf().getDatanodeRestartTimeout(); setCurrentStreamer(0); } @@ -643,6 +645,11 @@ private Set markExternalErrorOnStreamers() { "streamer: " + streamer); streamer.setExternalError(); healthySet.add(streamer); + } else if (!streamer.streamerClosed() + && streamer.getErrorState().hasDatanodeError() + && streamer.getErrorState().doWaitForRestart()) { + healthySet.add(streamer); + failedStreamers.remove(streamer); } } return healthySet; @@ -707,6 +714,14 @@ private void checkStreamerFailures(boolean isNeedFlushAllPackets) for (int i = 0; i < numAllBlocks; i++) { coordinator.offerStreamerUpdateResult(i, newFailed.size() == 0); } + //wait for get notify to failed stream + if (newFailed.size() != 0) { + try { + Thread.sleep(datanodeRestartTimeout); + } catch (InterruptedException e) { + // Do nothing + } + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java index e78beb1d80..e90e66ace4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java @@ -143,7 +143,8 @@ protected void setupPipelineInternal(DatanodeInfo[] nodes, // set up the pipeline again with the remaining nodes. when a striped // data streamer comes here, it must be in external error state. - assert getErrorState().hasExternalError(); + assert getErrorState().hasExternalError() + || getErrorState().doWaitForRestart(); success = createBlockOutputStream(nodes, nodeStorageTypes, nodeStorageIDs, newGS, true);