diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index df5a479e8b..96c86c3569 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -529,6 +529,7 @@ boolean doWaitForRestart() { private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS = CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10; private int lastCongestionBackoffTime; + private int maxPipelineRecoveryRetries; protected final LoadingCache excludedNodes; private final String[] favoredNodes; @@ -557,6 +558,7 @@ private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, this.excludedNodes = initExcludedNodes(conf.getExcludedNodesCacheExpiry()); this.errorState = new ErrorState(conf.getDatanodeRestartTimeout()); this.addBlockFlags = flags; + this.maxPipelineRecoveryRetries = conf.getMaxPipelineRecoveryRetries(); } /** @@ -1263,14 +1265,18 @@ private boolean processDatanodeOrExternalError() throws IOException { packetSendTime.clear(); } - // If we had to recover the pipeline five times in a row for the + // If we had to recover the pipeline more than the value + // defined by maxPipelineRecoveryRetries in a row for the // same packet, this client likely has corrupt data or corrupting // during transmission. - if (!errorState.isRestartingNode() && ++pipelineRecoveryCount > 5) { + if (!errorState.isRestartingNode() && ++pipelineRecoveryCount > + maxPipelineRecoveryRetries) { LOG.warn("Error recovering pipeline for writing " + - block + ". Already retried 5 times for the same packet."); + block + ". Already retried " + maxPipelineRecoveryRetries + + " times for the same packet."); lastException.set(new IOException("Failing write. Tried pipeline " + - "recovery 5 times without success.")); + "recovery " + maxPipelineRecoveryRetries + + " times without success.")); streamerClosed = true; return false; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index f858080929..c17ad0e861 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -83,6 +83,9 @@ public interface HdfsClientConfigKeys { "dfs.namenode.kerberos.principal"; String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size"; int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024; + String DFS_CLIENT_PIPELINE_RECOVERY_MAX_RETRIES = + "dfs.client.pipeline.recovery.max-retries"; + int DFS_CLIENT_PIPELINE_RECOVERY_MAX_RETRIES_DEFAULT = 5; String DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout"; String DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY = "dfs.client.socket.send.buffer.size"; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index facbe70589..f462dca993 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -107,6 +107,7 @@ public class DfsClientConf { private final int maxFailoverAttempts; private final int maxRetryAttempts; + private final int maxPipelineRecoveryRetries; private final int failoverSleepBaseMillis; private final int failoverSleepMaxMillis; private final int maxBlockAcquireFailures; @@ -294,6 +295,10 @@ public DfsClientConf(Configuration conf) { Preconditions.checkArgument(clientShortCircuitNum <= 5, HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM + "can't be more then 5."); + maxPipelineRecoveryRetries = conf.getInt( + HdfsClientConfigKeys.DFS_CLIENT_PIPELINE_RECOVERY_MAX_RETRIES, + HdfsClientConfigKeys.DFS_CLIENT_PIPELINE_RECOVERY_MAX_RETRIES_DEFAULT + ); } private ByteArrayManager.Conf loadWriteByteArrayManagerConf( @@ -698,6 +703,13 @@ public ShortCircuitConf getShortCircuitConf() { return shortCircuitConf; } + /** + *@return the maxPipelineRecoveryRetries + */ + public int getMaxPipelineRecoveryRetries() { + return maxPipelineRecoveryRetries; + } + /** * Configuration for short-circuit reads. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 56c65b5aff..94ff3ec71e 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4370,6 +4370,15 @@ + + dfs.client.pipeline.recovery.max-retries + 5 + + if the DFS client encounters errors in write pipeline, + retry up to the number defined by this property before giving up. + + + dfs.client.socket-timeout 60000