From bd3da73a0ff75231340b1168f7805164710bf4fe Mon Sep 17 00:00:00 2001 From: Karthik Palanisamy Date: Sat, 1 May 2021 11:05:31 -0700 Subject: [PATCH] HDFS-15865. Interrupt DataStreamer thread if no ack (#2728) --- .../java/org/apache/hadoop/hdfs/DataStreamer.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 96c86c3569..e04268eddc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -897,6 +897,8 @@ class DataStreamer extends Daemon { try (TraceScope ignored = dfsClient.getTracer(). newScope("waitForAckedSeqno")) { LOG.debug("{} waiting for ack for: {}", this, seqno); + int dnodes = nodes != null ? nodes.length : 3; + int writeTimeout = dfsClient.getDatanodeWriteTimeout(dnodes); long begin = Time.monotonicNow(); try { synchronized (dataQueue) { @@ -907,6 +909,16 @@ class DataStreamer extends Daemon { } try { dataQueue.wait(1000); // when we receive an ack, we notify on + long duration = Time.monotonicNow() - begin; + if (duration > writeTimeout) { + LOG.error("No ack received, took {}ms (threshold={}ms). " + + "File being written: {}, block: {}, " + + "Write pipeline datanodes: {}.", + duration, writeTimeout, src, block, nodes); + throw new InterruptedIOException("No ack received after " + + duration / 1000 + "s and a timeout of " + + writeTimeout / 1000 + "s"); + } // dataQueue } catch (InterruptedException ie) { throw new InterruptedIOException(