HDFS-15865. Interrupt DataStreamer thread if no ack (#2728)

This commit is contained in:
Karthik Palanisamy 2021-05-01 11:05:31 -07:00 committed by GitHub
parent 803ac4b1a0
commit bd3da73a0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -897,6 +897,8 @@ void waitForAckedSeqno(long seqno) throws IOException {
try (TraceScope ignored = dfsClient.getTracer(). try (TraceScope ignored = dfsClient.getTracer().
newScope("waitForAckedSeqno")) { newScope("waitForAckedSeqno")) {
LOG.debug("{} waiting for ack for: {}", this, seqno); LOG.debug("{} waiting for ack for: {}", this, seqno);
int dnodes = nodes != null ? nodes.length : 3;
int writeTimeout = dfsClient.getDatanodeWriteTimeout(dnodes);
long begin = Time.monotonicNow(); long begin = Time.monotonicNow();
try { try {
synchronized (dataQueue) { synchronized (dataQueue) {
@ -907,6 +909,16 @@ void waitForAckedSeqno(long seqno) throws IOException {
} }
try { try {
dataQueue.wait(1000); // when we receive an ack, we notify on dataQueue.wait(1000); // when we receive an ack, we notify on
long duration = Time.monotonicNow() - begin;
if (duration > writeTimeout) {
LOG.error("No ack received, took {}ms (threshold={}ms). "
+ "File being written: {}, block: {}, "
+ "Write pipeline datanodes: {}.",
duration, writeTimeout, src, block, nodes);
throw new InterruptedIOException("No ack received after " +
duration / 1000 + "s and a timeout of " +
writeTimeout / 1000 + "s");
}
// dataQueue // dataQueue
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
throw new InterruptedIOException( throw new InterruptedIOException(