diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 4fa578ab6c..d92f5943fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -476,6 +476,7 @@ boolean doWaitForRestart() { private DataOutputStream blockStream; private DataInputStream blockReplyStream; private ResponseProcessor response = null; + private final Object nodesLock = new Object(); private volatile DatanodeInfo[] nodes = null; // list of targets for current block private volatile StorageType[] storageTypes = null; private volatile String[] storageIDs = null; @@ -619,7 +620,9 @@ private void setPipeline(LocatedBlock lb) { private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes, String[] storageIDs) { - this.nodes = nodes; + synchronized (nodesLock) { + this.nodes = nodes; + } this.storageTypes = storageTypes; this.storageIDs = storageIDs; } @@ -916,7 +919,10 @@ void waitForAckedSeqno(long seqno) throws IOException { try (TraceScope ignored = dfsClient.getTracer(). newScope("waitForAckedSeqno")) { LOG.debug("{} waiting for ack for: {}", this, seqno); - int dnodes = nodes != null ? nodes.length : 3; + int dnodes; + synchronized (nodesLock) { + dnodes = nodes != null ? nodes.length : 3; + } int writeTimeout = dfsClient.getDatanodeWriteTimeout(dnodes); long begin = Time.monotonicNow(); try {