HDFS-15856: Make write pipeline retry times configurable. (#2721). Contributed by Qi Zhu

Reviewed-by: Ayush Saxena <ayushsaxena@apache.org>
Reviewed-by: Wei-Chiu Chuang <weichiu@apache.org>
Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
zhuqi 2021-03-02 13:16:11 +08:00 committed by GitHub
parent 9501c698f4
commit 1f1a1ef52d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 34 additions and 4 deletions

View File

@ -529,6 +529,7 @@ boolean doWaitForRestart() {
private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS = private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS =
CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10; CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10;
private int lastCongestionBackoffTime; private int lastCongestionBackoffTime;
private int maxPipelineRecoveryRetries;
protected final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes; protected final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
private final String[] favoredNodes; private final String[] favoredNodes;
@ -557,6 +558,7 @@ private DataStreamer(HdfsFileStatus stat, ExtendedBlock block,
this.excludedNodes = initExcludedNodes(conf.getExcludedNodesCacheExpiry()); this.excludedNodes = initExcludedNodes(conf.getExcludedNodesCacheExpiry());
this.errorState = new ErrorState(conf.getDatanodeRestartTimeout()); this.errorState = new ErrorState(conf.getDatanodeRestartTimeout());
this.addBlockFlags = flags; this.addBlockFlags = flags;
this.maxPipelineRecoveryRetries = conf.getMaxPipelineRecoveryRetries();
} }
/** /**
@ -1263,14 +1265,18 @@ private boolean processDatanodeOrExternalError() throws IOException {
packetSendTime.clear(); packetSendTime.clear();
} }
// If we had to recover the pipeline five times in a row for the // If we had to recover the pipeline more than the value
// defined by maxPipelineRecoveryRetries in a row for the
// same packet, this client likely has corrupt data or corrupting // same packet, this client likely has corrupt data or corrupting
// during transmission. // during transmission.
if (!errorState.isRestartingNode() && ++pipelineRecoveryCount > 5) { if (!errorState.isRestartingNode() && ++pipelineRecoveryCount >
maxPipelineRecoveryRetries) {
LOG.warn("Error recovering pipeline for writing " + LOG.warn("Error recovering pipeline for writing " +
block + ". Already retried 5 times for the same packet."); block + ". Already retried " + maxPipelineRecoveryRetries
+ " times for the same packet.");
lastException.set(new IOException("Failing write. Tried pipeline " + lastException.set(new IOException("Failing write. Tried pipeline " +
"recovery 5 times without success.")); "recovery " + maxPipelineRecoveryRetries
+ " times without success."));
streamerClosed = true; streamerClosed = true;
return false; return false;
} }

View File

@ -83,6 +83,9 @@ public interface HdfsClientConfigKeys {
"dfs.namenode.kerberos.principal"; "dfs.namenode.kerberos.principal";
String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size"; String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024; int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
String DFS_CLIENT_PIPELINE_RECOVERY_MAX_RETRIES =
"dfs.client.pipeline.recovery.max-retries";
int DFS_CLIENT_PIPELINE_RECOVERY_MAX_RETRIES_DEFAULT = 5;
String DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout"; String DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout";
String DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY = String DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY =
"dfs.client.socket.send.buffer.size"; "dfs.client.socket.send.buffer.size";

View File

@ -107,6 +107,7 @@ public class DfsClientConf {
private final int maxFailoverAttempts; private final int maxFailoverAttempts;
private final int maxRetryAttempts; private final int maxRetryAttempts;
private final int maxPipelineRecoveryRetries;
private final int failoverSleepBaseMillis; private final int failoverSleepBaseMillis;
private final int failoverSleepMaxMillis; private final int failoverSleepMaxMillis;
private final int maxBlockAcquireFailures; private final int maxBlockAcquireFailures;
@ -294,6 +295,10 @@ public DfsClientConf(Configuration conf) {
Preconditions.checkArgument(clientShortCircuitNum <= 5, Preconditions.checkArgument(clientShortCircuitNum <= 5,
HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM + HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM +
"can't be more then 5."); "can't be more then 5.");
maxPipelineRecoveryRetries = conf.getInt(
HdfsClientConfigKeys.DFS_CLIENT_PIPELINE_RECOVERY_MAX_RETRIES,
HdfsClientConfigKeys.DFS_CLIENT_PIPELINE_RECOVERY_MAX_RETRIES_DEFAULT
);
} }
private ByteArrayManager.Conf loadWriteByteArrayManagerConf( private ByteArrayManager.Conf loadWriteByteArrayManagerConf(
@ -698,6 +703,13 @@ public ShortCircuitConf getShortCircuitConf() {
return shortCircuitConf; return shortCircuitConf;
} }
/**
*@return the maxPipelineRecoveryRetries
*/
public int getMaxPipelineRecoveryRetries() {
return maxPipelineRecoveryRetries;
}
/** /**
* Configuration for short-circuit reads. * Configuration for short-circuit reads.
*/ */

View File

@ -4370,6 +4370,15 @@
</description> </description>
</property> </property>
<property>
<name>dfs.client.pipeline.recovery.max-retries</name>
<value>5</value>
<description>
if the DFS client encounters errors in write pipeline,
retry up to the number defined by this property before giving up.
</description>
</property>
<property> <property>
<name>dfs.client.socket-timeout</name> <name>dfs.client.socket-timeout</name>
<value>60000</value> <value>60000</value>