HDFS-8008. Support client-side back off when the datanodes are congested. Contributed by Haohui Mai.
This commit is contained in:
parent
75cb1d42ab
commit
6ccf4fbf8a
@ -868,6 +868,9 @@ Release 2.7.0 - UNRELEASED
|
||||
HDFS-7742. Favoring decommissioning node for replication can cause a block
|
||||
to stay underreplicated for long periods (Nathan Roberts via kihwal)
|
||||
|
||||
HDFS-8008. Support client-side back off when the datanodes are congested.
|
||||
(wheat9)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
|
||||
|
@ -218,6 +218,13 @@ class DataStreamer extends Daemon {
|
||||
private boolean failPacket = false;
|
||||
private final long dfsclientSlowLogThresholdMs;
|
||||
private long artificialSlowdown = 0;
|
||||
// List of congested data nodes. The stream will back off if the DataNodes
|
||||
// are congested
|
||||
private final ArrayList<DatanodeInfo> congestedNodes = new ArrayList<>();
|
||||
private static final int CONGESTION_BACKOFF_MEAN_TIME_IN_MS = 5000;
|
||||
private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS =
|
||||
CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10;
|
||||
private int lastCongestionBackoffTime;
|
||||
|
||||
private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
|
||||
|
||||
@ -386,6 +393,11 @@ class DataStreamer extends Daemon {
|
||||
one = createHeartbeatPacket();
|
||||
assert one != null;
|
||||
} else {
|
||||
try {
|
||||
backOffIfNecessary();
|
||||
} catch (InterruptedException e) {
|
||||
DFSClient.LOG.warn("Caught exception ", e);
|
||||
}
|
||||
one = dataQueue.getFirst(); // regular data packet
|
||||
long parents[] = one.getTraceParents();
|
||||
if (parents.length > 0) {
|
||||
@ -815,9 +827,14 @@ class DataStreamer extends Daemon {
|
||||
|
||||
long seqno = ack.getSeqno();
|
||||
// processes response status from datanodes.
|
||||
ArrayList<DatanodeInfo> congestedNodesFromAck = new ArrayList<>();
|
||||
for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) {
|
||||
final Status reply = PipelineAck.getStatusFromHeader(ack
|
||||
.getHeaderFlag(i));
|
||||
if (PipelineAck.getECNFromHeader(ack.getHeaderFlag(i)) ==
|
||||
PipelineAck.ECN.CONGESTED) {
|
||||
congestedNodesFromAck.add(targets[i]);
|
||||
}
|
||||
// Restart will not be treated differently unless it is
|
||||
// the local node or the only one in the pipeline.
|
||||
if (PipelineAck.isRestartOOBStatus(reply) &&
|
||||
@ -839,6 +856,18 @@ class DataStreamer extends Daemon {
|
||||
}
|
||||
}
|
||||
|
||||
if (!congestedNodesFromAck.isEmpty()) {
|
||||
synchronized (congestedNodes) {
|
||||
congestedNodes.clear();
|
||||
congestedNodes.addAll(congestedNodesFromAck);
|
||||
}
|
||||
} else {
|
||||
synchronized (congestedNodes) {
|
||||
congestedNodes.clear();
|
||||
lastCongestionBackoffTime = 0;
|
||||
}
|
||||
}
|
||||
|
||||
assert seqno != PipelineAck.UNKOWN_SEQNO :
|
||||
"Ack for unknown seqno should be a failed ack: " + ack;
|
||||
if (seqno == DFSPacket.HEART_BEAT_SEQNO) { // a heartbeat ack
|
||||
@ -1543,6 +1572,40 @@ class DataStreamer extends Daemon {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This function sleeps for a certain amount of time when the writing
|
||||
* pipeline is congested. The function calculates the time based on a
|
||||
* decorrelated filter.
|
||||
*
|
||||
* @see
|
||||
* <a href="http://www.awsarchitectureblog.com/2015/03/backoff.html">
|
||||
* http://www.awsarchitectureblog.com/2015/03/backoff.html</a>
|
||||
*/
|
||||
private void backOffIfNecessary() throws InterruptedException {
|
||||
int t = 0;
|
||||
synchronized (congestedNodes) {
|
||||
if (!congestedNodes.isEmpty()) {
|
||||
StringBuilder sb = new StringBuilder("DataNode");
|
||||
for (DatanodeInfo i : congestedNodes) {
|
||||
sb.append(' ').append(i);
|
||||
}
|
||||
int range = Math.abs(lastCongestionBackoffTime * 3 -
|
||||
CONGESTION_BACKOFF_MEAN_TIME_IN_MS);
|
||||
int base = Math.min(lastCongestionBackoffTime * 3,
|
||||
CONGESTION_BACKOFF_MEAN_TIME_IN_MS);
|
||||
t = Math.min(CONGESTION_BACK_OFF_MAX_TIME_IN_MS,
|
||||
(int)(base + Math.random() * range));
|
||||
lastCongestionBackoffTime = t;
|
||||
sb.append(" are congested. Backing off for ").append(t).append(" ms");
|
||||
DFSClient.LOG.info(sb.toString());
|
||||
congestedNodes.clear();
|
||||
}
|
||||
}
|
||||
if (t != 0) {
|
||||
Thread.sleep(t);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* get the block this streamer is writing to
|
||||
*
|
||||
|
@ -257,6 +257,10 @@ public class PipelineAck {
|
||||
return StatusFormat.getStatus(header);
|
||||
}
|
||||
|
||||
public static ECN getECNFromHeader(int header) {
|
||||
return StatusFormat.getECN(header);
|
||||
}
|
||||
|
||||
public static int setStatusForHeader(int old, Status status) {
|
||||
return StatusFormat.setStatus(old, status);
|
||||
}
|
||||
|
@ -17,20 +17,31 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedList;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestDFSOutputStream {
|
||||
static MiniDFSCluster cluster;
|
||||
|
||||
@ -100,6 +111,37 @@ public class TestDFSOutputStream {
|
||||
Assert.assertTrue((Integer) field.get(dos) + 257 < packetSize);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCongestionBackoff() throws IOException {
|
||||
DFSClient.Conf dfsClientConf = mock(DFSClient.Conf.class);
|
||||
DFSClient client = mock(DFSClient.class);
|
||||
when(client.getConf()).thenReturn(dfsClientConf);
|
||||
client.clientRunning = true;
|
||||
DataStreamer stream = new DataStreamer(
|
||||
mock(HdfsFileStatus.class),
|
||||
mock(ExtendedBlock.class),
|
||||
client,
|
||||
"foo", null, null, null, null);
|
||||
|
||||
DataOutputStream blockStream = mock(DataOutputStream.class);
|
||||
doThrow(new IOException()).when(blockStream).flush();
|
||||
Whitebox.setInternalState(stream, "blockStream", blockStream);
|
||||
Whitebox.setInternalState(stream, "stage",
|
||||
BlockConstructionStage.PIPELINE_CLOSE);
|
||||
@SuppressWarnings("unchecked")
|
||||
LinkedList<DFSPacket> dataQueue = (LinkedList<DFSPacket>)
|
||||
Whitebox.getInternalState(stream, "dataQueue");
|
||||
@SuppressWarnings("unchecked")
|
||||
ArrayList<DatanodeInfo> congestedNodes = (ArrayList<DatanodeInfo>)
|
||||
Whitebox.getInternalState(stream, "congestedNodes");
|
||||
congestedNodes.add(mock(DatanodeInfo.class));
|
||||
DFSPacket packet = mock(DFSPacket.class);
|
||||
when(packet.getTraceParents()).thenReturn(new long[] {});
|
||||
dataQueue.add(packet);
|
||||
stream.run();
|
||||
Assert.assertTrue(congestedNodes.isEmpty());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
cluster.shutdown();
|
||||
|
Loading…
x
Reference in New Issue
Block a user