diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 6482966fb9..d1d8d37f3d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -1208,22 +1208,46 @@ class DataStreamer extends Daemon { return; } - //get a new datanode + int tried = 0; final DatanodeInfo[] original = nodes; - final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode( - src, stat.getFileId(), block, nodes, storageIDs, - failed.toArray(new DatanodeInfo[failed.size()]), - 1, dfsClient.clientName); - setPipeline(lb); + final StorageType[] originalTypes = storageTypes; + final String[] originalIDs = storageIDs; + IOException caughtException = null; + ArrayList exclude = new ArrayList(failed); + while (tried < 3) { + LocatedBlock lb; + //get a new datanode + lb = dfsClient.namenode.getAdditionalDatanode( + src, stat.getFileId(), block, nodes, storageIDs, + exclude.toArray(new DatanodeInfo[exclude.size()]), + 1, dfsClient.clientName); + // a new node was allocated by the namenode. Update nodes. + setPipeline(lb); - //find the new datanode - final int d = findNewDatanode(original); + //find the new datanode + final int d = findNewDatanode(original); + //transfer replica. pick a source from the original nodes + final DatanodeInfo src = original[tried % original.length]; + final DatanodeInfo[] targets = {nodes[d]}; + final StorageType[] targetStorageTypes = {storageTypes[d]}; - //transfer replica - final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1]; - final DatanodeInfo[] targets = {nodes[d]}; - final StorageType[] targetStorageTypes = {storageTypes[d]}; - transfer(src, targets, targetStorageTypes, lb.getBlockToken()); + try { + transfer(src, targets, targetStorageTypes, lb.getBlockToken()); + } catch (IOException ioe) { + DFSClient.LOG.warn("Error transferring data from " + src + " to " + + nodes[d] + ": " + ioe.getMessage()); + caughtException = ioe; + // add the allocated node to the exclude list. + exclude.add(nodes[d]); + setPipeline(original, originalTypes, originalIDs); + tried++; + continue; + } + return; // finished successfully + } + // All retries failed + throw (caughtException != null) ? caughtException : + new IOException("Failed to add a node"); } private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, @@ -1236,7 +1260,11 @@ class DataStreamer extends Daemon { try { sock = createSocketForPipeline(src, 2, dfsClient); final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2); - final long readTimeout = dfsClient.getDatanodeReadTimeout(2); + + // transfer timeout multiplier based on the transfer size + // One per 200 packets = 12.8MB. Minimum is 2. + int multi = 2 + (int)(bytesSent/dfsClient.getConf().getWritePacketSize())/200; + final long readTimeout = dfsClient.getDatanodeReadTimeout(multi); OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout); diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 3571e4a4ca..1d9fa1da31 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1488,6 +1488,9 @@ Release 2.7.2 - UNRELEASED HDFS-9043. Doc updation for commands in HDFS Federation (J.Andreina via vinayakumab) + HDFS-9106. Transfer failure during pipeline recovery causes permanent + write failures (kihwal) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES