HADOOP-19061 Capture exception from rpcRequestSender.start() in IPC.Connection.run() (#6519)

* HADOOP-19061 - Capture exception from rpcRequestSender.start() in IPC.Connection.run() and proper cleaning is followed if an exception is thrown.

---------

Co-authored-by: Xing Lin <xinglin@linkedin.com>
This commit is contained in:
Xing Lin 2024-02-02 16:22:16 -08:00 committed by GitHub
parent d278b349f6
commit d74e5160cd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1081,15 +1081,15 @@ private synchronized void sendPing() throws IOException {
@Override @Override
public void run() { public void run() {
// Don't start the ipc parameter sending thread until we start this
// thread, because the shutdown logic only gets triggered if this
// thread is started.
rpcRequestThread.start();
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": starting, having connections "
+ connections.size());
try { try {
// Don't start the ipc parameter sending thread until we start this
// thread, because the shutdown logic only gets triggered if this
// thread is started.
rpcRequestThread.start();
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": starting, having connections " + connections.size());
}
while (waitForWork()) {//wait here for work - read or close connection while (waitForWork()) {//wait here for work - read or close connection
receiveRpcResponse(); receiveRpcResponse();
} }
@ -1097,15 +1097,17 @@ public void run() {
// This truly is unexpected, since we catch IOException in receiveResponse // This truly is unexpected, since we catch IOException in receiveResponse
// -- this is only to be really sure that we don't leave a client hanging // -- this is only to be really sure that we don't leave a client hanging
// forever. // forever.
LOG.warn("Unexpected error reading responses on connection " + this, t); String msg = String.format("Unexpected error on connection %s. Closing it.", this);
markClosed(new IOException("Error reading responses", t)); LOG.warn(msg, t);
markClosed(new IOException(msg, t));
} }
close(); close();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": stopped, remaining connections " LOG.debug(getName() + ": stopped, remaining connections "
+ connections.size()); + connections.size());
}
} }
/** /**