diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index d92f5943fd..04d6d9a565 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -46,6 +46,7 @@ import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -528,9 +529,8 @@ boolean doWaitForRestart() { // are congested private final List congestedNodes = new ArrayList<>(); private final Map slowNodeMap = new HashMap<>(); - private static final int CONGESTION_BACKOFF_MEAN_TIME_IN_MS = 5000; - private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS = - CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10; + private int congestionBackOffMeanTimeInMs; + private int congestionBackOffMaxTimeInMs; private int lastCongestionBackoffTime; private int maxPipelineRecoveryRetries; private int markSlowNodeAsBadNodeThreshold; @@ -564,6 +564,35 @@ private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, this.addBlockFlags = flags; this.maxPipelineRecoveryRetries = conf.getMaxPipelineRecoveryRetries(); this.markSlowNodeAsBadNodeThreshold = conf.getMarkSlowNodeAsBadNodeThreshold(); + congestionBackOffMeanTimeInMs = dfsClient.getConfiguration().getInt( + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME, + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME_DEFAULT); + congestionBackOffMaxTimeInMs = dfsClient.getConfiguration().getInt( + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME, + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME_DEFAULT); + if (congestionBackOffMeanTimeInMs <= 0) { + LOG.warn("Configuration: {} is not appropriate, using default value: {}", + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME, + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME_DEFAULT); + } + if (congestionBackOffMaxTimeInMs <= 0) { + LOG.warn("Configuration: {} is not appropriate, using default value: {}", + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME, + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME_DEFAULT); + } + if (congestionBackOffMaxTimeInMs < congestionBackOffMeanTimeInMs) { + LOG.warn("Configuration: {} can not less than {}, using their default values.", + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME, + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME); + } + if (congestionBackOffMeanTimeInMs <= 0 || congestionBackOffMaxTimeInMs <= 0 || + congestionBackOffMaxTimeInMs < congestionBackOffMeanTimeInMs) { + congestionBackOffMeanTimeInMs = + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME_DEFAULT; + congestionBackOffMaxTimeInMs = + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME_DEFAULT; + } + } /** @@ -1998,10 +2027,10 @@ private void backOffIfNecessary() throws InterruptedException { sb.append(' ').append(i); } int range = Math.abs(lastCongestionBackoffTime * 3 - - CONGESTION_BACKOFF_MEAN_TIME_IN_MS); + congestionBackOffMeanTimeInMs); int base = Math.min(lastCongestionBackoffTime * 3, - CONGESTION_BACKOFF_MEAN_TIME_IN_MS); - t = Math.min(CONGESTION_BACK_OFF_MAX_TIME_IN_MS, + congestionBackOffMeanTimeInMs); + t = Math.min(congestionBackOffMaxTimeInMs, (int)(base + Math.random() * range)); lastCongestionBackoffTime = t; sb.append(" are congested. Backing off for ").append(t).append(" ms"); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 2b511bfc2e..93e214a403 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -287,6 +287,15 @@ public interface HdfsClientConfigKeys { "dfs.client.output.stream.uniq.default.key"; String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT = "DEFAULT"; + String DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME = + "dfs.client.congestion.backoff.mean.time"; + int DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME_DEFAULT = 5000; + + String DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME = + "dfs.client.congestion.backoff.max.time"; + int DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME_DEFAULT = + DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME_DEFAULT * 10; + /** * These are deprecated config keys to client code. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 3ef433d6a4..174f7242bf 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -6559,6 +6559,22 @@ If the namespace is DEFAULT, it's best to change this conf to other value. + + dfs.client.congestion.backoff.mean.time + 5000 + + The mean time in milliseconds which is used to compute + client congestion backoff sleep time. + + + + dfs.client.congestion.backoff.max.time + 50000 + + The max time in milliseconds which is used to restrict + the upper limit backoff sleep time for client. + + dfs.client.rbf.observer.read.enable false diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index 8b90287e82..0f1b965cc2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -275,6 +275,8 @@ private void runAdjustChunkBoundary( public void testCongestionBackoff() throws IOException { DfsClientConf dfsClientConf = mock(DfsClientConf.class); DFSClient client = mock(DFSClient.class); + Configuration conf = mock(Configuration.class); + when(client.getConfiguration()).thenReturn(conf); when(client.getConf()).thenReturn(dfsClientConf); when(client.getTracer()).thenReturn(FsTracer.get(new Configuration())); client.clientRunning = true; @@ -306,6 +308,8 @@ public void testCongestionBackoff() throws IOException { public void testCongestionAckDelay() { DfsClientConf dfsClientConf = mock(DfsClientConf.class); DFSClient client = mock(DFSClient.class); + Configuration conf = mock(Configuration.class); + when(client.getConfiguration()).thenReturn(conf); when(client.getConf()).thenReturn(dfsClientConf); when(client.getTracer()).thenReturn(FsTracer.get(new Configuration())); client.clientRunning = true; @@ -325,7 +329,7 @@ public void testCongestionAckDelay() { ArrayList congestedNodes = (ArrayList) Whitebox.getInternalState(stream, "congestedNodes"); int backOffMaxTime = (int) - Whitebox.getInternalState(stream, "CONGESTION_BACK_OFF_MAX_TIME_IN_MS"); + Whitebox.getInternalState(stream, "congestionBackOffMaxTimeInMs"); DFSPacket[] packet = new DFSPacket[100]; AtomicBoolean isDelay = new AtomicBoolean(true);