[HDFS-15813] DataStreamer: keep sending heartbeat packets during flush. Contributed by Daryn Sharp and Jim Brennan

This commit is contained in:
Jim Brennan 2021-02-05 22:16:36 +00:00
parent d3c7cb7c38
commit 62389a5a04
2 changed files with 109 additions and 45 deletions

View File

@ -483,6 +483,7 @@ boolean doWaitForRestart() {
private volatile BlockConstructionStage stage; // block construction stage private volatile BlockConstructionStage stage; // block construction stage
protected long bytesSent = 0; // number of bytes that've been sent protected long bytesSent = 0; // number of bytes that've been sent
private final boolean isLazyPersistFile; private final boolean isLazyPersistFile;
private long lastPacket;
/** Nodes have been used in the pipeline before and have failed. */ /** Nodes have been used in the pipeline before and have failed. */
private final List<DatanodeInfo> failed = new ArrayList<>(); private final List<DatanodeInfo> failed = new ArrayList<>();
@ -632,6 +633,7 @@ private void initDataStreaming() {
response = new ResponseProcessor(nodes); response = new ResponseProcessor(nodes);
response.start(); response.start();
stage = BlockConstructionStage.DATA_STREAMING; stage = BlockConstructionStage.DATA_STREAMING;
lastPacket = Time.monotonicNow();
} }
protected void endBlock() { protected void endBlock() {
@ -653,7 +655,6 @@ private boolean shouldStop() {
*/ */
@Override @Override
public void run() { public void run() {
long lastPacket = Time.monotonicNow();
TraceScope scope = null; TraceScope scope = null;
while (!streamerClosed && dfsClient.clientRunning) { while (!streamerClosed && dfsClient.clientRunning) {
// if the Responder encountered an error, shutdown Responder // if the Responder encountered an error, shutdown Responder
@ -666,44 +667,35 @@ public void run() {
// process datanode IO errors if any // process datanode IO errors if any
boolean doSleep = processDatanodeOrExternalError(); boolean doSleep = processDatanodeOrExternalError();
final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;
synchronized (dataQueue) { synchronized (dataQueue) {
// wait for a packet to be sent. // wait for a packet to be sent.
long now = Time.monotonicNow(); while ((!shouldStop() && dataQueue.isEmpty()) || doSleep) {
while ((!shouldStop() && dataQueue.size() == 0 && long timeout = 1000;
(stage != BlockConstructionStage.DATA_STREAMING || if (stage == BlockConstructionStage.DATA_STREAMING) {
now - lastPacket < halfSocketTimeout)) || doSleep) { timeout = sendHeartbeat();
long timeout = halfSocketTimeout - (now-lastPacket); }
timeout = timeout <= 0 ? 1000 : timeout;
timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
timeout : 1000;
try { try {
dataQueue.wait(timeout); dataQueue.wait(timeout);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.debug("Thread interrupted", e); LOG.debug("Thread interrupted", e);
} }
doSleep = false; doSleep = false;
now = Time.monotonicNow();
} }
if (shouldStop()) { if (shouldStop()) {
continue; continue;
} }
// get packet to be sent. // get packet to be sent.
if (dataQueue.isEmpty()) { try {
one = createHeartbeatPacket(); backOffIfNecessary();
} else { } catch (InterruptedException e) {
try { LOG.debug("Thread interrupted", e);
backOffIfNecessary(); }
} catch (InterruptedException e) { one = dataQueue.getFirst(); // regular data packet
LOG.debug("Thread interrupted", e); SpanId[] parents = one.getTraceParents();
} if (parents.length > 0) {
one = dataQueue.getFirst(); // regular data packet scope = dfsClient.getTracer().
SpanId[] parents = one.getTraceParents(); newScope("dataStreamer", parents[0]);
if (parents.length > 0) { scope.getSpan().setParents(parents);
scope = dfsClient.getTracer().
newScope("dataStreamer", parents[0]);
scope.getSpan().setParents(parents);
}
} }
} }
@ -731,17 +723,8 @@ public void run() {
if (one.isLastPacketInBlock()) { if (one.isLastPacketInBlock()) {
// wait for all data packets have been successfully acked // wait for all data packets have been successfully acked
synchronized (dataQueue) { waitForAllAcks();
while (!shouldStop() && ackQueue.size() != 0) { if(shouldStop()) {
try {
// wait for acks to arrive from datanodes
dataQueue.wait(1000);
} catch (InterruptedException e) {
LOG.debug("Thread interrupted", e);
}
}
}
if (shouldStop()) {
continue; continue;
} }
stage = BlockConstructionStage.PIPELINE_CLOSE; stage = BlockConstructionStage.PIPELINE_CLOSE;
@ -770,8 +753,7 @@ public void run() {
// write out data to remote datanode // write out data to remote datanode
try (TraceScope ignored = dfsClient.getTracer(). try (TraceScope ignored = dfsClient.getTracer().
newScope("DataStreamer#writeTo", spanId)) { newScope("DataStreamer#writeTo", spanId)) {
one.writeTo(blockStream); sendPacket(one);
blockStream.flush();
} catch (IOException e) { } catch (IOException e) {
// HDFS-3398 treat primary DN is down since client is unable to // HDFS-3398 treat primary DN is down since client is unable to
// write to primary DN. If a failed or restarting node has already // write to primary DN. If a failed or restarting node has already
@ -782,7 +764,6 @@ public void run() {
errorState.markFirstNodeIfNotMarked(); errorState.markFirstNodeIfNotMarked();
throw e; throw e;
} }
lastPacket = Time.monotonicNow();
// update bytesSent // update bytesSent
long tmpBytesSent = one.getLastByteOffsetBlock(); long tmpBytesSent = one.getLastByteOffsetBlock();
@ -797,11 +778,7 @@ public void run() {
// Is this block full? // Is this block full?
if (one.isLastPacketInBlock()) { if (one.isLastPacketInBlock()) {
// wait for the close packet has been acked // wait for the close packet has been acked
synchronized (dataQueue) { waitForAllAcks();
while (!shouldStop() && ackQueue.size() != 0) {
dataQueue.wait(1000);// wait for acks to arrive from datanodes
}
}
if (shouldStop()) { if (shouldStop()) {
continue; continue;
} }
@ -842,6 +819,48 @@ public void run() {
closeInternal(); closeInternal();
} }
private void waitForAllAcks() throws IOException {
// wait until all data packets have been successfully acked
synchronized (dataQueue) {
while (!shouldStop() && !ackQueue.isEmpty()) {
try {
// wait for acks to arrive from datanodes
dataQueue.wait(sendHeartbeat());
} catch (InterruptedException e) {
LOG.debug("Thread interrupted ", e);
}
}
}
}
private void sendPacket(DFSPacket packet) throws IOException {
// write out data to remote datanode
try {
packet.writeTo(blockStream);
blockStream.flush();
} catch (IOException e) {
// HDFS-3398 treat primary DN is down since client is unable to
// write to primary DN. If a failed or restarting node has already
// been recorded by the responder, the following call will have no
// effect. Pipeline recovery can handle only one node error at a
// time. If the primary node fails again during the recovery, it
// will be taken out then.
errorState.markFirstNodeIfNotMarked();
throw e;
}
lastPacket = Time.monotonicNow();
}
private long sendHeartbeat() throws IOException {
final long heartbeatInterval = dfsClient.getConf().getSocketTimeout()/2;
long timeout = heartbeatInterval - (Time.monotonicNow() - lastPacket);
if (timeout <= 0) {
sendPacket(createHeartbeatPacket());
timeout = heartbeatInterval;
}
return timeout;
}
private void closeInternal() { private void closeInternal() {
closeResponder(); // close and join closeResponder(); // close and join
closeStream(); closeStream();

View File

@ -245,6 +245,51 @@ public boolean dropHeartbeatPacket() {
} }
} }
/**
* Test to ensure heartbeats continue during a flush in case of
* delayed acks.
*/
@Test
public void testHeartbeatDuringFlush() throws Exception {
// Delay sending acks
DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() {
@Override
public void delaySendingAckToUpstream(final String upstreamAddr)
throws IOException {
try {
Thread.sleep(3500); // delay longer than socket timeout
} catch (InterruptedException ie) {
throw new IOException("Interrupted while sleeping");
}
}
};
DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get();
// Setting the timeout to be 3 seconds. Heartbeat packet
// should be sent every 1.5 seconds if there is no data traffic.
Configuration conf = new HdfsConfiguration();
conf.set(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, "3000");
MiniDFSCluster cluster = null;
try {
int numDataNodes = 1;
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDataNodes).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
FSDataOutputStream out = fs.create(new Path("delayedack.dat"), (short)2);
out.write(0x31);
out.hflush();
DataNodeFaultInjector.set(dnFaultInjector); // cause ack delay
out.close();
} finally {
DataNodeFaultInjector.set(oldDnInjector);
if (cluster != null) {
cluster.shutdown();
}
}
}
/** /**
* Test recovery on restart OOB message. It also tests the delivery of * Test recovery on restart OOB message. It also tests the delivery of
* OOB ack originating from the primary datanode. Since there is only * OOB ack originating from the primary datanode. Since there is only